Spaces:
Runtime error
Runtime error
| import functools | |
| import logging | |
| import os | |
| import posixpath | |
| import re | |
| import urllib.parse | |
| from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union | |
| from pip._internal.utils.filetypes import WHEEL_EXTENSION | |
| from pip._internal.utils.hashes import Hashes | |
| from pip._internal.utils.misc import ( | |
| redact_auth_from_url, | |
| split_auth_from_netloc, | |
| splitext, | |
| ) | |
| from pip._internal.utils.models import KeyBasedCompareMixin | |
| from pip._internal.utils.urls import path_to_url, url_to_path | |
| if TYPE_CHECKING: | |
| from pip._internal.index.collector import HTMLPage | |
| logger = logging.getLogger(__name__) | |
| _SUPPORTED_HASHES = ("sha1", "sha224", "sha384", "sha256", "sha512", "md5") | |
| class Link(KeyBasedCompareMixin): | |
| """Represents a parsed link from a Package Index's simple URL | |
| """ | |
| __slots__ = [ | |
| "_parsed_url", | |
| "_url", | |
| "comes_from", | |
| "requires_python", | |
| "yanked_reason", | |
| "cache_link_parsing", | |
| ] | |
| def __init__( | |
| self, | |
| url: str, | |
| comes_from: Optional[Union[str, "HTMLPage"]] = None, | |
| requires_python: Optional[str] = None, | |
| yanked_reason: Optional[str] = None, | |
| cache_link_parsing: bool = True, | |
| ) -> None: | |
| """ | |
| :param url: url of the resource pointed to (href of the link) | |
| :param comes_from: instance of HTMLPage where the link was found, | |
| or string. | |
| :param requires_python: String containing the `Requires-Python` | |
| metadata field, specified in PEP 345. This may be specified by | |
| a data-requires-python attribute in the HTML link tag, as | |
| described in PEP 503. | |
| :param yanked_reason: the reason the file has been yanked, if the | |
| file has been yanked, or None if the file hasn't been yanked. | |
| This is the value of the "data-yanked" attribute, if present, in | |
| a simple repository HTML link. If the file has been yanked but | |
| no reason was provided, this should be the empty string. See | |
| PEP 592 for more information and the specification. | |
| :param cache_link_parsing: A flag that is used elsewhere to determine | |
| whether resources retrieved from this link | |
| should be cached. PyPI index urls should | |
| generally have this set to False, for | |
| example. | |
| """ | |
| # url can be a UNC windows share | |
| if url.startswith('\\\\'): | |
| url = path_to_url(url) | |
| self._parsed_url = urllib.parse.urlsplit(url) | |
| # Store the url as a private attribute to prevent accidentally | |
| # trying to set a new value. | |
| self._url = url | |
| self.comes_from = comes_from | |
| self.requires_python = requires_python if requires_python else None | |
| self.yanked_reason = yanked_reason | |
| super().__init__(key=url, defining_class=Link) | |
| self.cache_link_parsing = cache_link_parsing | |
| def __str__(self) -> str: | |
| if self.requires_python: | |
| rp = f' (requires-python:{self.requires_python})' | |
| else: | |
| rp = '' | |
| if self.comes_from: | |
| return '{} (from {}){}'.format( | |
| redact_auth_from_url(self._url), self.comes_from, rp) | |
| else: | |
| return redact_auth_from_url(str(self._url)) | |
| def __repr__(self) -> str: | |
| return f'<Link {self}>' | |
| def url(self) -> str: | |
| return self._url | |
| def filename(self) -> str: | |
| path = self.path.rstrip('/') | |
| name = posixpath.basename(path) | |
| if not name: | |
| # Make sure we don't leak auth information if the netloc | |
| # includes a username and password. | |
| netloc, user_pass = split_auth_from_netloc(self.netloc) | |
| return netloc | |
| name = urllib.parse.unquote(name) | |
| assert name, f'URL {self._url!r} produced no filename' | |
| return name | |
| def file_path(self) -> str: | |
| return url_to_path(self.url) | |
| def scheme(self) -> str: | |
| return self._parsed_url.scheme | |
| def netloc(self) -> str: | |
| """ | |
| This can contain auth information. | |
| """ | |
| return self._parsed_url.netloc | |
| def path(self) -> str: | |
| return urllib.parse.unquote(self._parsed_url.path) | |
| def splitext(self) -> Tuple[str, str]: | |
| return splitext(posixpath.basename(self.path.rstrip('/'))) | |
| def ext(self) -> str: | |
| return self.splitext()[1] | |
| def url_without_fragment(self) -> str: | |
| scheme, netloc, path, query, fragment = self._parsed_url | |
| return urllib.parse.urlunsplit((scheme, netloc, path, query, '')) | |
| _egg_fragment_re = re.compile(r'[#&]egg=([^&]*)') | |
| def egg_fragment(self) -> Optional[str]: | |
| match = self._egg_fragment_re.search(self._url) | |
| if not match: | |
| return None | |
| return match.group(1) | |
| _subdirectory_fragment_re = re.compile(r'[#&]subdirectory=([^&]*)') | |
| def subdirectory_fragment(self) -> Optional[str]: | |
| match = self._subdirectory_fragment_re.search(self._url) | |
| if not match: | |
| return None | |
| return match.group(1) | |
| _hash_re = re.compile( | |
| r'({choices})=([a-f0-9]+)'.format(choices="|".join(_SUPPORTED_HASHES)) | |
| ) | |
| def hash(self) -> Optional[str]: | |
| match = self._hash_re.search(self._url) | |
| if match: | |
| return match.group(2) | |
| return None | |
| def hash_name(self) -> Optional[str]: | |
| match = self._hash_re.search(self._url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def show_url(self) -> str: | |
| return posixpath.basename(self._url.split('#', 1)[0].split('?', 1)[0]) | |
| def is_file(self) -> bool: | |
| return self.scheme == 'file' | |
| def is_existing_dir(self) -> bool: | |
| return self.is_file and os.path.isdir(self.file_path) | |
| def is_wheel(self) -> bool: | |
| return self.ext == WHEEL_EXTENSION | |
| def is_vcs(self) -> bool: | |
| from pip._internal.vcs import vcs | |
| return self.scheme in vcs.all_schemes | |
| def is_yanked(self) -> bool: | |
| return self.yanked_reason is not None | |
| def has_hash(self) -> bool: | |
| return self.hash_name is not None | |
| def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool: | |
| """ | |
| Return True if the link has a hash and it is allowed. | |
| """ | |
| if hashes is None or not self.has_hash: | |
| return False | |
| # Assert non-None so mypy knows self.hash_name and self.hash are str. | |
| assert self.hash_name is not None | |
| assert self.hash is not None | |
| return hashes.is_hash_allowed(self.hash_name, hex_digest=self.hash) | |
| class _CleanResult(NamedTuple): | |
| """Convert link for equivalency check. | |
| This is used in the resolver to check whether two URL-specified requirements | |
| likely point to the same distribution and can be considered equivalent. This | |
| equivalency logic avoids comparing URLs literally, which can be too strict | |
| (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users. | |
| Currently this does three things: | |
| 1. Drop the basic auth part. This is technically wrong since a server can | |
| serve different content based on auth, but if it does that, it is even | |
| impossible to guarantee two URLs without auth are equivalent, since | |
| the user can input different auth information when prompted. So the | |
| practical solution is to assume the auth doesn't affect the response. | |
| 2. Parse the query to avoid the ordering issue. Note that ordering under the | |
| same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are | |
| still considered different. | |
| 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and | |
| hash values, since it should have no impact the downloaded content. Note | |
| that this drops the "egg=" part historically used to denote the requested | |
| project (and extras), which is wrong in the strictest sense, but too many | |
| people are supplying it inconsistently to cause superfluous resolution | |
| conflicts, so we choose to also ignore them. | |
| """ | |
| parsed: urllib.parse.SplitResult | |
| query: Dict[str, List[str]] | |
| subdirectory: str | |
| hashes: Dict[str, str] | |
| def _clean_link(link: Link) -> _CleanResult: | |
| parsed = link._parsed_url | |
| netloc = parsed.netloc.rsplit("@", 1)[-1] | |
| # According to RFC 8089, an empty host in file: means localhost. | |
| if parsed.scheme == "file" and not netloc: | |
| netloc = "localhost" | |
| fragment = urllib.parse.parse_qs(parsed.fragment) | |
| if "egg" in fragment: | |
| logger.debug("Ignoring egg= fragment in %s", link) | |
| try: | |
| # If there are multiple subdirectory values, use the first one. | |
| # This matches the behavior of Link.subdirectory_fragment. | |
| subdirectory = fragment["subdirectory"][0] | |
| except (IndexError, KeyError): | |
| subdirectory = "" | |
| # If there are multiple hash values under the same algorithm, use the | |
| # first one. This matches the behavior of Link.hash_value. | |
| hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment} | |
| return _CleanResult( | |
| parsed=parsed._replace(netloc=netloc, query="", fragment=""), | |
| query=urllib.parse.parse_qs(parsed.query), | |
| subdirectory=subdirectory, | |
| hashes=hashes, | |
| ) | |
| def links_equivalent(link1: Link, link2: Link) -> bool: | |
| return _clean_link(link1) == _clean_link(link2) | |