mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 21:51:50 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			544 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			544 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Classes representing releases and distributions retrieved from indexes.
 | |
| 
 | |
| A project (= unique name) can have several releases (= versions) and
 | |
| each release can have several distributions (= sdist and bdists).
 | |
| 
 | |
| Release objects contain metadata-related information (see PEP 376);
 | |
| distribution objects contain download-related information.
 | |
| """
 | |
| 
 | |
| import re
 | |
| import hashlib
 | |
| import tempfile
 | |
| import urllib.request
 | |
| import urllib.parse
 | |
| import urllib.error
 | |
| import urllib.parse
 | |
| from shutil import unpack_archive
 | |
| 
 | |
| from packaging.errors import IrrationalVersionError
 | |
| from packaging.version import (suggest_normalized_version, NormalizedVersion,
 | |
|                                get_version_predicate)
 | |
| from packaging.metadata import Metadata
 | |
| from packaging.pypi.errors import (HashDoesNotMatch, UnsupportedHashName,
 | |
|                                    CantParseArchiveName)
 | |
| 
 | |
| 
 | |
| __all__ = ['ReleaseInfo', 'DistInfo', 'ReleasesList', 'get_infos_from_url']
 | |
| 
 | |
| EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz .egg".split()
 | |
| MD5_HASH = re.compile(r'^.*#md5=([a-f0-9]+)$')
 | |
| DIST_TYPES = ['bdist', 'sdist']
 | |
| 
 | |
| 
 | |
| class IndexReference:
 | |
|     """Mixin used to store the index reference"""
 | |
|     def set_index(self, index=None):
 | |
|         self._index = index
 | |
| 
 | |
| 
 | |
| class ReleaseInfo(IndexReference):
 | |
|     """Represent a release of a project (a project with a specific version).
 | |
|     The release contain the _metadata informations related to this specific
 | |
|     version, and is also a container for distribution related informations.
 | |
| 
 | |
|     See the DistInfo class for more information about distributions.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, name, version, metadata=None, hidden=False,
 | |
|                  index=None, **kwargs):
 | |
|         """
 | |
|         :param name: the name of the distribution
 | |
|         :param version: the version of the distribution
 | |
|         :param metadata: the metadata fields of the release.
 | |
|         :type metadata: dict
 | |
|         :param kwargs: optional arguments for a new distribution.
 | |
|         """
 | |
|         self.set_index(index)
 | |
|         self.name = name
 | |
|         self._version = None
 | |
|         self.version = version
 | |
|         if metadata:
 | |
|             self.metadata = Metadata(mapping=metadata)
 | |
|         else:
 | |
|             self.metadata = None
 | |
|         self.dists = {}
 | |
|         self.hidden = hidden
 | |
| 
 | |
|         if 'dist_type' in kwargs:
 | |
|             dist_type = kwargs.pop('dist_type')
 | |
|             self.add_distribution(dist_type, **kwargs)
 | |
| 
 | |
|     def set_version(self, version):
 | |
|         try:
 | |
|             self._version = NormalizedVersion(version)
 | |
|         except IrrationalVersionError:
 | |
|             suggestion = suggest_normalized_version(version)
 | |
|             if suggestion:
 | |
|                 self.version = suggestion
 | |
|             else:
 | |
|                 raise IrrationalVersionError(version)
 | |
| 
 | |
|     def get_version(self):
 | |
|         return self._version
 | |
| 
 | |
|     version = property(get_version, set_version)
 | |
| 
 | |
|     def fetch_metadata(self):
 | |
|         """If the metadata is not set, use the indexes to get it"""
 | |
|         if not self.metadata:
 | |
|             self._index.get_metadata(self.name, str(self.version))
 | |
|         return self.metadata
 | |
| 
 | |
|     @property
 | |
|     def is_final(self):
 | |
|         """proxy to version.is_final"""
 | |
|         return self.version.is_final
 | |
| 
 | |
|     def fetch_distributions(self):
 | |
|         if self.dists is None:
 | |
|             self._index.get_distributions(self.name, str(self.version))
 | |
|             if self.dists is None:
 | |
|                 self.dists = {}
 | |
|         return self.dists
 | |
| 
 | |
|     def add_distribution(self, dist_type='sdist', python_version=None,
 | |
|                          **params):
 | |
|         """Add distribution informations to this release.
 | |
|         If distribution information is already set for this distribution type,
 | |
|         add the given url paths to the distribution. This can be useful while
 | |
|         some of them fails to download.
 | |
| 
 | |
|         :param dist_type: the distribution type (eg. "sdist", "bdist", etc.)
 | |
|         :param params: the fields to be passed to the distribution object
 | |
|                        (see the :class:DistInfo constructor).
 | |
|         """
 | |
|         if dist_type not in DIST_TYPES:
 | |
|             raise ValueError(dist_type)
 | |
|         if dist_type in self.dists:
 | |
|             self.dists[dist_type].add_url(**params)
 | |
|         else:
 | |
|             self.dists[dist_type] = DistInfo(self, dist_type,
 | |
|                                              index=self._index, **params)
 | |
|         if python_version:
 | |
|             self.dists[dist_type].python_version = python_version
 | |
| 
 | |
|     def get_distribution(self, dist_type=None, prefer_source=True):
 | |
|         """Return a distribution.
 | |
| 
 | |
|         If dist_type is set, find first for this distribution type, and just
 | |
|         act as an alias of __get_item__.
 | |
| 
 | |
|         If prefer_source is True, search first for source distribution, and if
 | |
|         not return one existing distribution.
 | |
|         """
 | |
|         if len(self.dists) == 0:
 | |
|             raise LookupError
 | |
|         if dist_type:
 | |
|             return self[dist_type]
 | |
|         if prefer_source:
 | |
|             if "sdist" in self.dists:
 | |
|                 dist = self["sdist"]
 | |
|             else:
 | |
|                 dist = next(self.dists.values())
 | |
|             return dist
 | |
| 
 | |
|     def unpack(self, path=None, prefer_source=True):
 | |
|         """Unpack the distribution to the given path.
 | |
| 
 | |
|         If not destination is given, creates a temporary location.
 | |
| 
 | |
|         Returns the location of the extracted files (root).
 | |
|         """
 | |
|         return self.get_distribution(prefer_source=prefer_source)\
 | |
|                    .unpack(path=path)
 | |
| 
 | |
|     def download(self, temp_path=None, prefer_source=True):
 | |
|         """Download the distribution, using the requirements.
 | |
| 
 | |
|         If more than one distribution match the requirements, use the last
 | |
|         version.
 | |
|         Download the distribution, and put it in the temp_path. If no temp_path
 | |
|         is given, creates and return one.
 | |
| 
 | |
|         Returns the complete absolute path to the downloaded archive.
 | |
|         """
 | |
|         return self.get_distribution(prefer_source=prefer_source)\
 | |
|                    .download(path=temp_path)
 | |
| 
 | |
|     def set_metadata(self, metadata):
 | |
|         if not self.metadata:
 | |
|             self.metadata = Metadata()
 | |
|         self.metadata.update(metadata)
 | |
| 
 | |
|     def __getitem__(self, item):
 | |
|         """distributions are available using release["sdist"]"""
 | |
|         return self.dists[item]
 | |
| 
 | |
|     def _check_is_comparable(self, other):
 | |
|         if not isinstance(other, ReleaseInfo):
 | |
|             raise TypeError("cannot compare %s and %s"
 | |
|                 % (type(self).__name__, type(other).__name__))
 | |
|         elif self.name != other.name:
 | |
|             raise TypeError("cannot compare %s and %s"
 | |
|                 % (self.name, other.name))
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "<%s %s>" % (self.name, self.version)
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         self._check_is_comparable(other)
 | |
|         return self.version == other.version
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         self._check_is_comparable(other)
 | |
|         return self.version < other.version
 | |
| 
 | |
|     def __ne__(self, other):
 | |
|         return not self.__eq__(other)
 | |
| 
 | |
|     def __gt__(self, other):
 | |
|         return not (self.__lt__(other) or self.__eq__(other))
 | |
| 
 | |
|     def __le__(self, other):
 | |
|         return self.__eq__(other) or self.__lt__(other)
 | |
| 
 | |
|     def __ge__(self, other):
 | |
|         return self.__eq__(other) or self.__gt__(other)
 | |
| 
 | |
|     # See http://docs.python.org/reference/datamodel#object.__hash__
 | |
|     __hash__ = object.__hash__
 | |
| 
 | |
| 
 | |
| class DistInfo(IndexReference):
 | |
|     """Represents a distribution retrieved from an index (sdist, bdist, ...)
 | |
|     """
 | |
| 
 | |
|     def __init__(self, release, dist_type=None, url=None, hashname=None,
 | |
|                  hashval=None, is_external=True, python_version=None,
 | |
|                  index=None):
 | |
|         """Create a new instance of DistInfo.
 | |
| 
 | |
|         :param release: a DistInfo class is relative to a release.
 | |
|         :param dist_type: the type of the dist (eg. source, bin-*, etc.)
 | |
|         :param url: URL where we found this distribution
 | |
|         :param hashname: the name of the hash we want to use. Refer to the
 | |
|                          hashlib.new documentation for more information.
 | |
|         :param hashval: the hash value.
 | |
|         :param is_external: we need to know if the provided url comes from
 | |
|                             an index browsing, or from an external resource.
 | |
| 
 | |
|         """
 | |
|         self.set_index(index)
 | |
|         self.release = release
 | |
|         self.dist_type = dist_type
 | |
|         self.python_version = python_version
 | |
|         self._unpacked_dir = None
 | |
|         # set the downloaded path to None by default. The goal here
 | |
|         # is to not download distributions multiple times
 | |
|         self.downloaded_location = None
 | |
|         # We store urls in dict, because we need to have a bit more infos
 | |
|         # than the simple URL. It will be used later to find the good url to
 | |
|         # use.
 | |
|         # We have two _url* attributes: _url and urls. urls contains a list
 | |
|         # of dict for the different urls, and _url contains the choosen url, in
 | |
|         # order to dont make the selection process multiple times.
 | |
|         self.urls = []
 | |
|         self._url = None
 | |
|         self.add_url(url, hashname, hashval, is_external)
 | |
| 
 | |
|     def add_url(self, url=None, hashname=None, hashval=None, is_external=True):
 | |
|         """Add a new url to the list of urls"""
 | |
|         if hashname is not None:
 | |
|             try:
 | |
|                 hashlib.new(hashname)
 | |
|             except ValueError:
 | |
|                 raise UnsupportedHashName(hashname)
 | |
|         if url not in [u['url'] for u in self.urls]:
 | |
|             self.urls.append({
 | |
|                 'url': url,
 | |
|                 'hashname': hashname,
 | |
|                 'hashval': hashval,
 | |
|                 'is_external': is_external,
 | |
|             })
 | |
|             # reset the url selection process
 | |
|             self._url = None
 | |
| 
 | |
|     @property
 | |
|     def url(self):
 | |
|         """Pick up the right url for the list of urls in self.urls"""
 | |
|         # We return internal urls over externals.
 | |
|         # If there is more than one internal or external, return the first
 | |
|         # one.
 | |
|         if self._url is None:
 | |
|             if len(self.urls) > 1:
 | |
|                 internals_urls = [u for u in self.urls \
 | |
|                                   if u['is_external'] == False]
 | |
|                 if len(internals_urls) >= 1:
 | |
|                     self._url = internals_urls[0]
 | |
|             if self._url is None:
 | |
|                 self._url = self.urls[0]
 | |
|         return self._url
 | |
| 
 | |
|     @property
 | |
|     def is_source(self):
 | |
|         """return if the distribution is a source one or not"""
 | |
|         return self.dist_type == 'sdist'
 | |
| 
 | |
|     def download(self, path=None):
 | |
|         """Download the distribution to a path, and return it.
 | |
| 
 | |
|         If the path is given in path, use this, otherwise, generates a new one
 | |
|         Return the download location.
 | |
|         """
 | |
|         if path is None:
 | |
|             path = tempfile.mkdtemp()
 | |
| 
 | |
|         # if we do not have downloaded it yet, do it.
 | |
|         if self.downloaded_location is None:
 | |
|             url = self.url['url']
 | |
|             archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
 | |
|             filename, headers = urllib.request.urlretrieve(url,
 | |
|                                                    path + "/" + archive_name)
 | |
|             self.downloaded_location = filename
 | |
|             self._check_md5(filename)
 | |
|         return self.downloaded_location
 | |
| 
 | |
|     def unpack(self, path=None):
 | |
|         """Unpack the distribution to the given path.
 | |
| 
 | |
|         If not destination is given, creates a temporary location.
 | |
| 
 | |
|         Returns the location of the extracted files (root).
 | |
|         """
 | |
|         if not self._unpacked_dir:
 | |
|             if path is None:
 | |
|                 path = tempfile.mkdtemp()
 | |
| 
 | |
|             filename = self.download(path)
 | |
|             unpack_archive(filename, path)
 | |
|             self._unpacked_dir = path
 | |
| 
 | |
|         return path
 | |
| 
 | |
|     def _check_md5(self, filename):
 | |
|         """Check that the md5 checksum of the given file matches the one in
 | |
|         url param"""
 | |
|         hashname = self.url['hashname']
 | |
|         expected_hashval = self.url['hashval']
 | |
|         if None not in (expected_hashval, hashname):
 | |
|             with open(filename, 'rb') as f:
 | |
|                 hashval = hashlib.new(hashname)
 | |
|                 hashval.update(f.read())
 | |
| 
 | |
|             if hashval.hexdigest() != expected_hashval:
 | |
|                 raise HashDoesNotMatch("got %s instead of %s"
 | |
|                     % (hashval.hexdigest(), expected_hashval))
 | |
| 
 | |
|     def __repr__(self):
 | |
|         if self.release is None:
 | |
|             return "<? ? %s>" % self.dist_type
 | |
| 
 | |
|         return "<%s %s %s>" % (
 | |
|             self.release.name, self.release.version, self.dist_type or "")
 | |
| 
 | |
| 
 | |
| class ReleasesList(IndexReference):
 | |
|     """A container of Release.
 | |
| 
 | |
|     Provides useful methods and facilities to sort and filter releases.
 | |
|     """
 | |
|     def __init__(self, name, releases=None, contains_hidden=False, index=None):
 | |
|         self.set_index(index)
 | |
|         self.releases = []
 | |
|         self.name = name
 | |
|         self.contains_hidden = contains_hidden
 | |
|         if releases:
 | |
|             self.add_releases(releases)
 | |
| 
 | |
|     def fetch_releases(self):
 | |
|         self._index.get_releases(self.name)
 | |
|         return self.releases
 | |
| 
 | |
|     def filter(self, predicate):
 | |
|         """Filter and return a subset of releases matching the given predicate.
 | |
|         """
 | |
|         return ReleasesList(self.name, [release for release in self.releases
 | |
|                                         if predicate.match(release.version)],
 | |
|                                         index=self._index)
 | |
| 
 | |
|     def get_last(self, requirements, prefer_final=None):
 | |
|         """Return the "last" release, that satisfy the given predicates.
 | |
| 
 | |
|         "last" is defined by the version number of the releases, you also could
 | |
|         set prefer_final parameter to True or False to change the order results
 | |
|         """
 | |
|         predicate = get_version_predicate(requirements)
 | |
|         releases = self.filter(predicate)
 | |
|         if len(releases) == 0:
 | |
|             return None
 | |
|         releases.sort_releases(prefer_final, reverse=True)
 | |
|         return releases[0]
 | |
| 
 | |
|     def add_releases(self, releases):
 | |
|         """Add releases in the release list.
 | |
| 
 | |
|         :param: releases is a list of ReleaseInfo objects.
 | |
|         """
 | |
|         for r in releases:
 | |
|             self.add_release(release=r)
 | |
| 
 | |
|     def add_release(self, version=None, dist_type='sdist', release=None,
 | |
|                     **dist_args):
 | |
|         """Add a release to the list.
 | |
| 
 | |
|         The release can be passed in the `release` parameter, and in this case,
 | |
|         it will be crawled to extract the useful informations if necessary, or
 | |
|         the release informations can be directly passed in the `version` and
 | |
|         `dist_type` arguments.
 | |
| 
 | |
|         Other keywords arguments can be provided, and will be forwarded to the
 | |
|         distribution creation (eg. the arguments of the DistInfo constructor).
 | |
|         """
 | |
|         if release:
 | |
|             if release.name.lower() != self.name.lower():
 | |
|                 raise ValueError("%s is not the same project as %s" %
 | |
|                                  (release.name, self.name))
 | |
|             version = str(release.version)
 | |
| 
 | |
|             if version not in self.get_versions():
 | |
|                 # append only if not already exists
 | |
|                 self.releases.append(release)
 | |
|             for dist in release.dists.values():
 | |
|                 for url in dist.urls:
 | |
|                     self.add_release(version, dist.dist_type, **url)
 | |
|         else:
 | |
|             matches = [r for r in self.releases
 | |
|                        if str(r.version) == version and r.name == self.name]
 | |
|             if not matches:
 | |
|                 release = ReleaseInfo(self.name, version, index=self._index)
 | |
|                 self.releases.append(release)
 | |
|             else:
 | |
|                 release = matches[0]
 | |
| 
 | |
|             release.add_distribution(dist_type=dist_type, **dist_args)
 | |
| 
 | |
|     def sort_releases(self, prefer_final=False, reverse=True, *args, **kwargs):
 | |
|         """Sort the results with the given properties.
 | |
| 
 | |
|         The `prefer_final` argument can be used to specify if final
 | |
|         distributions (eg. not dev, bet or alpha) would be prefered or not.
 | |
| 
 | |
|         Results can be inverted by using `reverse`.
 | |
| 
 | |
|         Any other parameter provided will be forwarded to the sorted call. You
 | |
|         cannot redefine the key argument of "sorted" here, as it is used
 | |
|         internally to sort the releases.
 | |
|         """
 | |
| 
 | |
|         sort_by = []
 | |
|         if prefer_final:
 | |
|             sort_by.append("is_final")
 | |
|         sort_by.append("version")
 | |
| 
 | |
|         self.releases.sort(
 | |
|             key=lambda i: tuple(getattr(i, arg) for arg in sort_by),
 | |
|             reverse=reverse, *args, **kwargs)
 | |
| 
 | |
|     def get_release(self, version):
 | |
|         """Return a release from its version."""
 | |
|         matches = [r for r in self.releases if str(r.version) == version]
 | |
|         if len(matches) != 1:
 | |
|             raise KeyError(version)
 | |
|         return matches[0]
 | |
| 
 | |
|     def get_versions(self):
 | |
|         """Return a list of releases versions contained"""
 | |
|         return [str(r.version) for r in self.releases]
 | |
| 
 | |
|     def __getitem__(self, key):
 | |
|         return self.releases[key]
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self.releases)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         string = 'Project "%s"' % self.name
 | |
|         if self.get_versions():
 | |
|             string += ' versions: %s' % ', '.join(self.get_versions())
 | |
|         return '<%s>' % string
 | |
| 
 | |
| 
 | |
| def get_infos_from_url(url, probable_dist_name=None, is_external=True):
 | |
|     """Get useful informations from an URL.
 | |
| 
 | |
|     Return a dict of (name, version, url, hashtype, hash, is_external)
 | |
| 
 | |
|     :param url: complete url of the distribution
 | |
|     :param probable_dist_name: A probable name of the project.
 | |
|     :param is_external: Tell if the url commes from an index or from
 | |
|                         an external URL.
 | |
|     """
 | |
|     # if the url contains a md5 hash, get it.
 | |
|     md5_hash = None
 | |
|     match = MD5_HASH.match(url)
 | |
|     if match is not None:
 | |
|         md5_hash = match.group(1)
 | |
|         # remove the hash
 | |
|         url = url.replace("#md5=%s" % md5_hash, "")
 | |
| 
 | |
|     # parse the archive name to find dist name and version
 | |
|     archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
 | |
|     extension_matched = False
 | |
|     # remove the extension from the name
 | |
|     for ext in EXTENSIONS:
 | |
|         if archive_name.endswith(ext):
 | |
|             archive_name = archive_name[:-len(ext)]
 | |
|             extension_matched = True
 | |
| 
 | |
|     name, version = split_archive_name(archive_name)
 | |
|     if extension_matched is True:
 | |
|         return {'name': name,
 | |
|                 'version': version,
 | |
|                 'url': url,
 | |
|                 'hashname': "md5",
 | |
|                 'hashval': md5_hash,
 | |
|                 'is_external': is_external,
 | |
|                 'dist_type': 'sdist'}
 | |
| 
 | |
| 
 | |
| def split_archive_name(archive_name, probable_name=None):
 | |
|     """Split an archive name into two parts: name and version.
 | |
| 
 | |
|     Return the tuple (name, version)
 | |
|     """
 | |
|     # Try to determine wich part is the name and wich is the version using the
 | |
|     # "-" separator. Take the larger part to be the version number then reduce
 | |
|     # if this not works.
 | |
|     def eager_split(str, maxsplit=2):
 | |
|         # split using the "-" separator
 | |
|         splits = str.rsplit("-", maxsplit)
 | |
|         name = splits[0]
 | |
|         version = "-".join(splits[1:])
 | |
|         if version.startswith("-"):
 | |
|             version = version[1:]
 | |
|         if suggest_normalized_version(version) is None and maxsplit >= 0:
 | |
|             # we dont get a good version number: recurse !
 | |
|             return eager_split(str, maxsplit - 1)
 | |
|         else:
 | |
|             return name, version
 | |
|     if probable_name is not None:
 | |
|         probable_name = probable_name.lower()
 | |
|     name = None
 | |
|     if probable_name is not None and probable_name in archive_name:
 | |
|         # we get the name from probable_name, if given.
 | |
|         name = probable_name
 | |
|         version = archive_name.lstrip(name)
 | |
|     else:
 | |
|         name, version = eager_split(archive_name)
 | |
| 
 | |
|     version = suggest_normalized_version(version)
 | |
|     if version is not None and name != "":
 | |
|         return name.lower(), version
 | |
|     else:
 | |
|         raise CantParseArchiveName(archive_name)
 | 
