mirror of
				https://forge.chapril.org/tykayn/orgmode-to-gemini-blog
				synced 2025-10-09 17:02:45 +02:00 
			
		
		
		
	move on index build and linking previous and next articles
This commit is contained in:
		
							parent
							
								
									7d221d970a
								
							
						
					
					
						commit
						16b93f380e
					
				
					 1711 changed files with 231792 additions and 838 deletions
				
			
		|  | @ -0,0 +1,90 @@ | |||
| import collections | ||||
| import logging | ||||
| from dataclasses import dataclass | ||||
| from typing import Generator, List, Optional, Sequence, Tuple | ||||
| 
 | ||||
| from pip._internal.utils.logging import indent_log | ||||
| 
 | ||||
| from .req_file import parse_requirements | ||||
| from .req_install import InstallRequirement | ||||
| from .req_set import RequirementSet | ||||
| 
 | ||||
| __all__ = [ | ||||
|     "RequirementSet", | ||||
|     "InstallRequirement", | ||||
|     "parse_requirements", | ||||
|     "install_given_reqs", | ||||
| ] | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class InstallationResult: | ||||
|     name: str | ||||
| 
 | ||||
| 
 | ||||
| def _validate_requirements( | ||||
|     requirements: List[InstallRequirement], | ||||
| ) -> Generator[Tuple[str, InstallRequirement], None, None]: | ||||
|     for req in requirements: | ||||
|         assert req.name, f"invalid to-be-installed requirement: {req}" | ||||
|         yield req.name, req | ||||
| 
 | ||||
| 
 | ||||
| def install_given_reqs( | ||||
|     requirements: List[InstallRequirement], | ||||
|     global_options: Sequence[str], | ||||
|     root: Optional[str], | ||||
|     home: Optional[str], | ||||
|     prefix: Optional[str], | ||||
|     warn_script_location: bool, | ||||
|     use_user_site: bool, | ||||
|     pycompile: bool, | ||||
| ) -> List[InstallationResult]: | ||||
|     """ | ||||
|     Install everything in the given list. | ||||
| 
 | ||||
|     (to be called after having downloaded and unpacked the packages) | ||||
|     """ | ||||
|     to_install = collections.OrderedDict(_validate_requirements(requirements)) | ||||
| 
 | ||||
|     if to_install: | ||||
|         logger.info( | ||||
|             "Installing collected packages: %s", | ||||
|             ", ".join(to_install.keys()), | ||||
|         ) | ||||
| 
 | ||||
|     installed = [] | ||||
| 
 | ||||
|     with indent_log(): | ||||
|         for req_name, requirement in to_install.items(): | ||||
|             if requirement.should_reinstall: | ||||
|                 logger.info("Attempting uninstall: %s", req_name) | ||||
|                 with indent_log(): | ||||
|                     uninstalled_pathset = requirement.uninstall(auto_confirm=True) | ||||
|             else: | ||||
|                 uninstalled_pathset = None | ||||
| 
 | ||||
|             try: | ||||
|                 requirement.install( | ||||
|                     global_options, | ||||
|                     root=root, | ||||
|                     home=home, | ||||
|                     prefix=prefix, | ||||
|                     warn_script_location=warn_script_location, | ||||
|                     use_user_site=use_user_site, | ||||
|                     pycompile=pycompile, | ||||
|                 ) | ||||
|             except Exception: | ||||
|                 # if install did not succeed, rollback previous uninstall | ||||
|                 if uninstalled_pathset and not requirement.install_succeeded: | ||||
|                     uninstalled_pathset.rollback() | ||||
|                 raise | ||||
|             else: | ||||
|                 if uninstalled_pathset and requirement.install_succeeded: | ||||
|                     uninstalled_pathset.commit() | ||||
| 
 | ||||
|             installed.append(InstallationResult(req_name)) | ||||
| 
 | ||||
|     return installed | ||||
|  | @ -0,0 +1,560 @@ | |||
| """Backing implementation for InstallRequirement's various constructors | ||||
| 
 | ||||
| The idea here is that these formed a major chunk of InstallRequirement's size | ||||
| so, moving them and support code dedicated to them outside of that class | ||||
| helps creates for better understandability for the rest of the code. | ||||
| 
 | ||||
| These are meant to be used elsewhere within pip to create instances of | ||||
| InstallRequirement. | ||||
| """ | ||||
| 
 | ||||
| import copy | ||||
| import logging | ||||
| import os | ||||
| import re | ||||
| from dataclasses import dataclass | ||||
| from typing import Collection, Dict, List, Optional, Set, Tuple, Union | ||||
| 
 | ||||
| from pip._vendor.packaging.markers import Marker | ||||
| from pip._vendor.packaging.requirements import InvalidRequirement, Requirement | ||||
| from pip._vendor.packaging.specifiers import Specifier | ||||
| 
 | ||||
| from pip._internal.exceptions import InstallationError | ||||
| from pip._internal.models.index import PyPI, TestPyPI | ||||
| from pip._internal.models.link import Link | ||||
| from pip._internal.models.wheel import Wheel | ||||
| from pip._internal.req.req_file import ParsedRequirement | ||||
| from pip._internal.req.req_install import InstallRequirement | ||||
| from pip._internal.utils.filetypes import is_archive_file | ||||
| from pip._internal.utils.misc import is_installable_dir | ||||
| from pip._internal.utils.packaging import get_requirement | ||||
| from pip._internal.utils.urls import path_to_url | ||||
| from pip._internal.vcs import is_url, vcs | ||||
| 
 | ||||
| __all__ = [ | ||||
|     "install_req_from_editable", | ||||
|     "install_req_from_line", | ||||
|     "parse_editable", | ||||
| ] | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| operators = Specifier._operators.keys() | ||||
| 
 | ||||
| 
 | ||||
| def _strip_extras(path: str) -> Tuple[str, Optional[str]]: | ||||
|     m = re.match(r"^(.+)(\[[^\]]+\])$", path) | ||||
|     extras = None | ||||
|     if m: | ||||
|         path_no_extras = m.group(1) | ||||
|         extras = m.group(2) | ||||
|     else: | ||||
|         path_no_extras = path | ||||
| 
 | ||||
|     return path_no_extras, extras | ||||
| 
 | ||||
| 
 | ||||
| def convert_extras(extras: Optional[str]) -> Set[str]: | ||||
|     if not extras: | ||||
|         return set() | ||||
|     return get_requirement("placeholder" + extras.lower()).extras | ||||
| 
 | ||||
| 
 | ||||
| def _set_requirement_extras(req: Requirement, new_extras: Set[str]) -> Requirement: | ||||
|     """ | ||||
|     Returns a new requirement based on the given one, with the supplied extras. If the | ||||
|     given requirement already has extras those are replaced (or dropped if no new extras | ||||
|     are given). | ||||
|     """ | ||||
|     match: Optional[re.Match[str]] = re.fullmatch( | ||||
|         # see https://peps.python.org/pep-0508/#complete-grammar | ||||
|         r"([\w\t .-]+)(\[[^\]]*\])?(.*)", | ||||
|         str(req), | ||||
|         flags=re.ASCII, | ||||
|     ) | ||||
|     # ireq.req is a valid requirement so the regex should always match | ||||
|     assert ( | ||||
|         match is not None | ||||
|     ), f"regex match on requirement {req} failed, this should never happen" | ||||
|     pre: Optional[str] = match.group(1) | ||||
|     post: Optional[str] = match.group(3) | ||||
|     assert ( | ||||
|         pre is not None and post is not None | ||||
|     ), f"regex group selection for requirement {req} failed, this should never happen" | ||||
|     extras: str = "[{}]".format(",".join(sorted(new_extras)) if new_extras else "") | ||||
|     return get_requirement(f"{pre}{extras}{post}") | ||||
| 
 | ||||
| 
 | ||||
| def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]: | ||||
|     """Parses an editable requirement into: | ||||
|         - a requirement name | ||||
|         - an URL | ||||
|         - extras | ||||
|         - editable options | ||||
|     Accepted requirements: | ||||
|         svn+http://blahblah@rev#egg=Foobar[baz]&subdirectory=version_subdir | ||||
|         .[some_extra] | ||||
|     """ | ||||
| 
 | ||||
|     url = editable_req | ||||
| 
 | ||||
|     # If a file path is specified with extras, strip off the extras. | ||||
|     url_no_extras, extras = _strip_extras(url) | ||||
| 
 | ||||
|     if os.path.isdir(url_no_extras): | ||||
|         # Treating it as code that has already been checked out | ||||
|         url_no_extras = path_to_url(url_no_extras) | ||||
| 
 | ||||
|     if url_no_extras.lower().startswith("file:"): | ||||
|         package_name = Link(url_no_extras).egg_fragment | ||||
|         if extras: | ||||
|             return ( | ||||
|                 package_name, | ||||
|                 url_no_extras, | ||||
|                 get_requirement("placeholder" + extras.lower()).extras, | ||||
|             ) | ||||
|         else: | ||||
|             return package_name, url_no_extras, set() | ||||
| 
 | ||||
|     for version_control in vcs: | ||||
|         if url.lower().startswith(f"{version_control}:"): | ||||
|             url = f"{version_control}+{url}" | ||||
|             break | ||||
| 
 | ||||
|     link = Link(url) | ||||
| 
 | ||||
|     if not link.is_vcs: | ||||
|         backends = ", ".join(vcs.all_schemes) | ||||
|         raise InstallationError( | ||||
|             f"{editable_req} is not a valid editable requirement. " | ||||
|             f"It should either be a path to a local project or a VCS URL " | ||||
|             f"(beginning with {backends})." | ||||
|         ) | ||||
| 
 | ||||
|     package_name = link.egg_fragment | ||||
|     if not package_name: | ||||
|         raise InstallationError( | ||||
|             f"Could not detect requirement name for '{editable_req}', " | ||||
|             "please specify one with #egg=your_package_name" | ||||
|         ) | ||||
|     return package_name, url, set() | ||||
| 
 | ||||
| 
 | ||||
| def check_first_requirement_in_file(filename: str) -> None: | ||||
|     """Check if file is parsable as a requirements file. | ||||
| 
 | ||||
|     This is heavily based on ``pkg_resources.parse_requirements``, but | ||||
|     simplified to just check the first meaningful line. | ||||
| 
 | ||||
|     :raises InvalidRequirement: If the first meaningful line cannot be parsed | ||||
|         as an requirement. | ||||
|     """ | ||||
|     with open(filename, encoding="utf-8", errors="ignore") as f: | ||||
|         # Create a steppable iterator, so we can handle \-continuations. | ||||
|         lines = ( | ||||
|             line | ||||
|             for line in (line.strip() for line in f) | ||||
|             if line and not line.startswith("#")  # Skip blank lines/comments. | ||||
|         ) | ||||
| 
 | ||||
|         for line in lines: | ||||
|             # Drop comments -- a hash without a space may be in a URL. | ||||
|             if " #" in line: | ||||
|                 line = line[: line.find(" #")] | ||||
|             # If there is a line continuation, drop it, and append the next line. | ||||
|             if line.endswith("\\"): | ||||
|                 line = line[:-2].strip() + next(lines, "") | ||||
|             get_requirement(line) | ||||
|             return | ||||
| 
 | ||||
| 
 | ||||
| def deduce_helpful_msg(req: str) -> str: | ||||
|     """Returns helpful msg in case requirements file does not exist, | ||||
|     or cannot be parsed. | ||||
| 
 | ||||
|     :params req: Requirements file path | ||||
|     """ | ||||
|     if not os.path.exists(req): | ||||
|         return f" File '{req}' does not exist." | ||||
|     msg = " The path does exist. " | ||||
|     # Try to parse and check if it is a requirements file. | ||||
|     try: | ||||
|         check_first_requirement_in_file(req) | ||||
|     except InvalidRequirement: | ||||
|         logger.debug("Cannot parse '%s' as requirements file", req) | ||||
|     else: | ||||
|         msg += ( | ||||
|             f"The argument you provided " | ||||
|             f"({req}) appears to be a" | ||||
|             f" requirements file. If that is the" | ||||
|             f" case, use the '-r' flag to install" | ||||
|             f" the packages specified within it." | ||||
|         ) | ||||
|     return msg | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class RequirementParts: | ||||
|     requirement: Optional[Requirement] | ||||
|     link: Optional[Link] | ||||
|     markers: Optional[Marker] | ||||
|     extras: Set[str] | ||||
| 
 | ||||
| 
 | ||||
| def parse_req_from_editable(editable_req: str) -> RequirementParts: | ||||
|     name, url, extras_override = parse_editable(editable_req) | ||||
| 
 | ||||
|     if name is not None: | ||||
|         try: | ||||
|             req: Optional[Requirement] = get_requirement(name) | ||||
|         except InvalidRequirement as exc: | ||||
|             raise InstallationError(f"Invalid requirement: {name!r}: {exc}") | ||||
|     else: | ||||
|         req = None | ||||
| 
 | ||||
|     link = Link(url) | ||||
| 
 | ||||
|     return RequirementParts(req, link, None, extras_override) | ||||
| 
 | ||||
| 
 | ||||
| # ---- The actual constructors follow ---- | ||||
| 
 | ||||
| 
 | ||||
| def install_req_from_editable( | ||||
|     editable_req: str, | ||||
|     comes_from: Optional[Union[InstallRequirement, str]] = None, | ||||
|     *, | ||||
|     use_pep517: Optional[bool] = None, | ||||
|     isolated: bool = False, | ||||
|     global_options: Optional[List[str]] = None, | ||||
|     hash_options: Optional[Dict[str, List[str]]] = None, | ||||
|     constraint: bool = False, | ||||
|     user_supplied: bool = False, | ||||
|     permit_editable_wheels: bool = False, | ||||
|     config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, | ||||
| ) -> InstallRequirement: | ||||
|     parts = parse_req_from_editable(editable_req) | ||||
| 
 | ||||
|     return InstallRequirement( | ||||
|         parts.requirement, | ||||
|         comes_from=comes_from, | ||||
|         user_supplied=user_supplied, | ||||
|         editable=True, | ||||
|         permit_editable_wheels=permit_editable_wheels, | ||||
|         link=parts.link, | ||||
|         constraint=constraint, | ||||
|         use_pep517=use_pep517, | ||||
|         isolated=isolated, | ||||
|         global_options=global_options, | ||||
|         hash_options=hash_options, | ||||
|         config_settings=config_settings, | ||||
|         extras=parts.extras, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def _looks_like_path(name: str) -> bool: | ||||
|     """Checks whether the string "looks like" a path on the filesystem. | ||||
| 
 | ||||
|     This does not check whether the target actually exists, only judge from the | ||||
|     appearance. | ||||
| 
 | ||||
|     Returns true if any of the following conditions is true: | ||||
|     * a path separator is found (either os.path.sep or os.path.altsep); | ||||
|     * a dot is found (which represents the current directory). | ||||
|     """ | ||||
|     if os.path.sep in name: | ||||
|         return True | ||||
|     if os.path.altsep is not None and os.path.altsep in name: | ||||
|         return True | ||||
|     if name.startswith("."): | ||||
|         return True | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| def _get_url_from_path(path: str, name: str) -> Optional[str]: | ||||
|     """ | ||||
|     First, it checks whether a provided path is an installable directory. If it | ||||
|     is, returns the path. | ||||
| 
 | ||||
|     If false, check if the path is an archive file (such as a .whl). | ||||
|     The function checks if the path is a file. If false, if the path has | ||||
|     an @, it will treat it as a PEP 440 URL requirement and return the path. | ||||
|     """ | ||||
|     if _looks_like_path(name) and os.path.isdir(path): | ||||
|         if is_installable_dir(path): | ||||
|             return path_to_url(path) | ||||
|         # TODO: The is_installable_dir test here might not be necessary | ||||
|         #       now that it is done in load_pyproject_toml too. | ||||
|         raise InstallationError( | ||||
|             f"Directory {name!r} is not installable. Neither 'setup.py' " | ||||
|             "nor 'pyproject.toml' found." | ||||
|         ) | ||||
|     if not is_archive_file(path): | ||||
|         return None | ||||
|     if os.path.isfile(path): | ||||
|         return path_to_url(path) | ||||
|     urlreq_parts = name.split("@", 1) | ||||
|     if len(urlreq_parts) >= 2 and not _looks_like_path(urlreq_parts[0]): | ||||
|         # If the path contains '@' and the part before it does not look | ||||
|         # like a path, try to treat it as a PEP 440 URL req instead. | ||||
|         return None | ||||
|     logger.warning( | ||||
|         "Requirement %r looks like a filename, but the file does not exist", | ||||
|         name, | ||||
|     ) | ||||
|     return path_to_url(path) | ||||
| 
 | ||||
| 
 | ||||
| def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementParts: | ||||
|     if is_url(name): | ||||
|         marker_sep = "; " | ||||
|     else: | ||||
|         marker_sep = ";" | ||||
|     if marker_sep in name: | ||||
|         name, markers_as_string = name.split(marker_sep, 1) | ||||
|         markers_as_string = markers_as_string.strip() | ||||
|         if not markers_as_string: | ||||
|             markers = None | ||||
|         else: | ||||
|             markers = Marker(markers_as_string) | ||||
|     else: | ||||
|         markers = None | ||||
|     name = name.strip() | ||||
|     req_as_string = None | ||||
|     path = os.path.normpath(os.path.abspath(name)) | ||||
|     link = None | ||||
|     extras_as_string = None | ||||
| 
 | ||||
|     if is_url(name): | ||||
|         link = Link(name) | ||||
|     else: | ||||
|         p, extras_as_string = _strip_extras(path) | ||||
|         url = _get_url_from_path(p, name) | ||||
|         if url is not None: | ||||
|             link = Link(url) | ||||
| 
 | ||||
|     # it's a local file, dir, or url | ||||
|     if link: | ||||
|         # Handle relative file URLs | ||||
|         if link.scheme == "file" and re.search(r"\.\./", link.url): | ||||
|             link = Link(path_to_url(os.path.normpath(os.path.abspath(link.path)))) | ||||
|         # wheel file | ||||
|         if link.is_wheel: | ||||
|             wheel = Wheel(link.filename)  # can raise InvalidWheelFilename | ||||
|             req_as_string = f"{wheel.name}=={wheel.version}" | ||||
|         else: | ||||
|             # set the req to the egg fragment.  when it's not there, this | ||||
|             # will become an 'unnamed' requirement | ||||
|             req_as_string = link.egg_fragment | ||||
| 
 | ||||
|     # a requirement specifier | ||||
|     else: | ||||
|         req_as_string = name | ||||
| 
 | ||||
|     extras = convert_extras(extras_as_string) | ||||
| 
 | ||||
|     def with_source(text: str) -> str: | ||||
|         if not line_source: | ||||
|             return text | ||||
|         return f"{text} (from {line_source})" | ||||
| 
 | ||||
|     def _parse_req_string(req_as_string: str) -> Requirement: | ||||
|         try: | ||||
|             return get_requirement(req_as_string) | ||||
|         except InvalidRequirement as exc: | ||||
|             if os.path.sep in req_as_string: | ||||
|                 add_msg = "It looks like a path." | ||||
|                 add_msg += deduce_helpful_msg(req_as_string) | ||||
|             elif "=" in req_as_string and not any( | ||||
|                 op in req_as_string for op in operators | ||||
|             ): | ||||
|                 add_msg = "= is not a valid operator. Did you mean == ?" | ||||
|             else: | ||||
|                 add_msg = "" | ||||
|             msg = with_source(f"Invalid requirement: {req_as_string!r}: {exc}") | ||||
|             if add_msg: | ||||
|                 msg += f"\nHint: {add_msg}" | ||||
|             raise InstallationError(msg) | ||||
| 
 | ||||
|     if req_as_string is not None: | ||||
|         req: Optional[Requirement] = _parse_req_string(req_as_string) | ||||
|     else: | ||||
|         req = None | ||||
| 
 | ||||
|     return RequirementParts(req, link, markers, extras) | ||||
| 
 | ||||
| 
 | ||||
| def install_req_from_line( | ||||
|     name: str, | ||||
|     comes_from: Optional[Union[str, InstallRequirement]] = None, | ||||
|     *, | ||||
|     use_pep517: Optional[bool] = None, | ||||
|     isolated: bool = False, | ||||
|     global_options: Optional[List[str]] = None, | ||||
|     hash_options: Optional[Dict[str, List[str]]] = None, | ||||
|     constraint: bool = False, | ||||
|     line_source: Optional[str] = None, | ||||
|     user_supplied: bool = False, | ||||
|     config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, | ||||
| ) -> InstallRequirement: | ||||
|     """Creates an InstallRequirement from a name, which might be a | ||||
|     requirement, directory containing 'setup.py', filename, or URL. | ||||
| 
 | ||||
|     :param line_source: An optional string describing where the line is from, | ||||
|         for logging purposes in case of an error. | ||||
|     """ | ||||
|     parts = parse_req_from_line(name, line_source) | ||||
| 
 | ||||
|     return InstallRequirement( | ||||
|         parts.requirement, | ||||
|         comes_from, | ||||
|         link=parts.link, | ||||
|         markers=parts.markers, | ||||
|         use_pep517=use_pep517, | ||||
|         isolated=isolated, | ||||
|         global_options=global_options, | ||||
|         hash_options=hash_options, | ||||
|         config_settings=config_settings, | ||||
|         constraint=constraint, | ||||
|         extras=parts.extras, | ||||
|         user_supplied=user_supplied, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def install_req_from_req_string( | ||||
|     req_string: str, | ||||
|     comes_from: Optional[InstallRequirement] = None, | ||||
|     isolated: bool = False, | ||||
|     use_pep517: Optional[bool] = None, | ||||
|     user_supplied: bool = False, | ||||
| ) -> InstallRequirement: | ||||
|     try: | ||||
|         req = get_requirement(req_string) | ||||
|     except InvalidRequirement as exc: | ||||
|         raise InstallationError(f"Invalid requirement: {req_string!r}: {exc}") | ||||
| 
 | ||||
|     domains_not_allowed = [ | ||||
|         PyPI.file_storage_domain, | ||||
|         TestPyPI.file_storage_domain, | ||||
|     ] | ||||
|     if ( | ||||
|         req.url | ||||
|         and comes_from | ||||
|         and comes_from.link | ||||
|         and comes_from.link.netloc in domains_not_allowed | ||||
|     ): | ||||
|         # Explicitly disallow pypi packages that depend on external urls | ||||
|         raise InstallationError( | ||||
|             "Packages installed from PyPI cannot depend on packages " | ||||
|             "which are not also hosted on PyPI.\n" | ||||
|             f"{comes_from.name} depends on {req} " | ||||
|         ) | ||||
| 
 | ||||
|     return InstallRequirement( | ||||
|         req, | ||||
|         comes_from, | ||||
|         isolated=isolated, | ||||
|         use_pep517=use_pep517, | ||||
|         user_supplied=user_supplied, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def install_req_from_parsed_requirement( | ||||
|     parsed_req: ParsedRequirement, | ||||
|     isolated: bool = False, | ||||
|     use_pep517: Optional[bool] = None, | ||||
|     user_supplied: bool = False, | ||||
|     config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, | ||||
| ) -> InstallRequirement: | ||||
|     if parsed_req.is_editable: | ||||
|         req = install_req_from_editable( | ||||
|             parsed_req.requirement, | ||||
|             comes_from=parsed_req.comes_from, | ||||
|             use_pep517=use_pep517, | ||||
|             constraint=parsed_req.constraint, | ||||
|             isolated=isolated, | ||||
|             user_supplied=user_supplied, | ||||
|             config_settings=config_settings, | ||||
|         ) | ||||
| 
 | ||||
|     else: | ||||
|         req = install_req_from_line( | ||||
|             parsed_req.requirement, | ||||
|             comes_from=parsed_req.comes_from, | ||||
|             use_pep517=use_pep517, | ||||
|             isolated=isolated, | ||||
|             global_options=( | ||||
|                 parsed_req.options.get("global_options", []) | ||||
|                 if parsed_req.options | ||||
|                 else [] | ||||
|             ), | ||||
|             hash_options=( | ||||
|                 parsed_req.options.get("hashes", {}) if parsed_req.options else {} | ||||
|             ), | ||||
|             constraint=parsed_req.constraint, | ||||
|             line_source=parsed_req.line_source, | ||||
|             user_supplied=user_supplied, | ||||
|             config_settings=config_settings, | ||||
|         ) | ||||
|     return req | ||||
| 
 | ||||
| 
 | ||||
| def install_req_from_link_and_ireq( | ||||
|     link: Link, ireq: InstallRequirement | ||||
| ) -> InstallRequirement: | ||||
|     return InstallRequirement( | ||||
|         req=ireq.req, | ||||
|         comes_from=ireq.comes_from, | ||||
|         editable=ireq.editable, | ||||
|         link=link, | ||||
|         markers=ireq.markers, | ||||
|         use_pep517=ireq.use_pep517, | ||||
|         isolated=ireq.isolated, | ||||
|         global_options=ireq.global_options, | ||||
|         hash_options=ireq.hash_options, | ||||
|         config_settings=ireq.config_settings, | ||||
|         user_supplied=ireq.user_supplied, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def install_req_drop_extras(ireq: InstallRequirement) -> InstallRequirement: | ||||
|     """ | ||||
|     Creates a new InstallationRequirement using the given template but without | ||||
|     any extras. Sets the original requirement as the new one's parent | ||||
|     (comes_from). | ||||
|     """ | ||||
|     return InstallRequirement( | ||||
|         req=( | ||||
|             _set_requirement_extras(ireq.req, set()) if ireq.req is not None else None | ||||
|         ), | ||||
|         comes_from=ireq, | ||||
|         editable=ireq.editable, | ||||
|         link=ireq.link, | ||||
|         markers=ireq.markers, | ||||
|         use_pep517=ireq.use_pep517, | ||||
|         isolated=ireq.isolated, | ||||
|         global_options=ireq.global_options, | ||||
|         hash_options=ireq.hash_options, | ||||
|         constraint=ireq.constraint, | ||||
|         extras=[], | ||||
|         config_settings=ireq.config_settings, | ||||
|         user_supplied=ireq.user_supplied, | ||||
|         permit_editable_wheels=ireq.permit_editable_wheels, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def install_req_extend_extras( | ||||
|     ireq: InstallRequirement, | ||||
|     extras: Collection[str], | ||||
| ) -> InstallRequirement: | ||||
|     """ | ||||
|     Returns a copy of an installation requirement with some additional extras. | ||||
|     Makes a shallow copy of the ireq object. | ||||
|     """ | ||||
|     result = copy.copy(ireq) | ||||
|     result.extras = {*ireq.extras, *extras} | ||||
|     result.req = ( | ||||
|         _set_requirement_extras(ireq.req, result.extras) | ||||
|         if ireq.req is not None | ||||
|         else None | ||||
|     ) | ||||
|     return result | ||||
							
								
								
									
										623
									
								
								.venv/lib/python3.11/site-packages/pip/_internal/req/req_file.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										623
									
								
								.venv/lib/python3.11/site-packages/pip/_internal/req/req_file.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,623 @@ | |||
| """ | ||||
| Requirements file parsing | ||||
| """ | ||||
| 
 | ||||
| import codecs | ||||
| import locale | ||||
| import logging | ||||
| import optparse | ||||
| import os | ||||
| import re | ||||
| import shlex | ||||
| import sys | ||||
| import urllib.parse | ||||
| from dataclasses import dataclass | ||||
| from optparse import Values | ||||
| from typing import ( | ||||
|     TYPE_CHECKING, | ||||
|     Any, | ||||
|     Callable, | ||||
|     Dict, | ||||
|     Generator, | ||||
|     Iterable, | ||||
|     List, | ||||
|     NoReturn, | ||||
|     Optional, | ||||
|     Tuple, | ||||
| ) | ||||
| 
 | ||||
| from pip._internal.cli import cmdoptions | ||||
| from pip._internal.exceptions import InstallationError, RequirementsFileParseError | ||||
| from pip._internal.models.search_scope import SearchScope | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from pip._internal.index.package_finder import PackageFinder | ||||
|     from pip._internal.network.session import PipSession | ||||
| 
 | ||||
| __all__ = ["parse_requirements"] | ||||
| 
 | ||||
| ReqFileLines = Iterable[Tuple[int, str]] | ||||
| 
 | ||||
| LineParser = Callable[[str], Tuple[str, Values]] | ||||
| 
 | ||||
| SCHEME_RE = re.compile(r"^(http|https|file):", re.I) | ||||
| COMMENT_RE = re.compile(r"(^|\s+)#.*$") | ||||
| 
 | ||||
| # Matches environment variable-style values in '${MY_VARIABLE_1}' with the | ||||
| # variable name consisting of only uppercase letters, digits or the '_' | ||||
| # (underscore). This follows the POSIX standard defined in IEEE Std 1003.1, | ||||
| # 2013 Edition. | ||||
| ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})") | ||||
| 
 | ||||
| SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [ | ||||
|     cmdoptions.index_url, | ||||
|     cmdoptions.extra_index_url, | ||||
|     cmdoptions.no_index, | ||||
|     cmdoptions.constraints, | ||||
|     cmdoptions.requirements, | ||||
|     cmdoptions.editable, | ||||
|     cmdoptions.find_links, | ||||
|     cmdoptions.no_binary, | ||||
|     cmdoptions.only_binary, | ||||
|     cmdoptions.prefer_binary, | ||||
|     cmdoptions.require_hashes, | ||||
|     cmdoptions.pre, | ||||
|     cmdoptions.trusted_host, | ||||
|     cmdoptions.use_new_feature, | ||||
| ] | ||||
| 
 | ||||
| # options to be passed to requirements | ||||
| SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [ | ||||
|     cmdoptions.global_options, | ||||
|     cmdoptions.hash, | ||||
|     cmdoptions.config_settings, | ||||
| ] | ||||
| 
 | ||||
| SUPPORTED_OPTIONS_EDITABLE_REQ: List[Callable[..., optparse.Option]] = [ | ||||
|     cmdoptions.config_settings, | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| # the 'dest' string values | ||||
| SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ] | ||||
| SUPPORTED_OPTIONS_EDITABLE_REQ_DEST = [ | ||||
|     str(o().dest) for o in SUPPORTED_OPTIONS_EDITABLE_REQ | ||||
| ] | ||||
| 
 | ||||
| # order of BOMS is important: codecs.BOM_UTF16_LE is a prefix of codecs.BOM_UTF32_LE | ||||
| # so data.startswith(BOM_UTF16_LE) would be true for UTF32_LE data | ||||
| BOMS: List[Tuple[bytes, str]] = [ | ||||
|     (codecs.BOM_UTF8, "utf-8"), | ||||
|     (codecs.BOM_UTF32, "utf-32"), | ||||
|     (codecs.BOM_UTF32_BE, "utf-32-be"), | ||||
|     (codecs.BOM_UTF32_LE, "utf-32-le"), | ||||
|     (codecs.BOM_UTF16, "utf-16"), | ||||
|     (codecs.BOM_UTF16_BE, "utf-16-be"), | ||||
|     (codecs.BOM_UTF16_LE, "utf-16-le"), | ||||
| ] | ||||
| 
 | ||||
| PEP263_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)") | ||||
| DEFAULT_ENCODING = "utf-8" | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class ParsedRequirement: | ||||
|     # TODO: replace this with slots=True when dropping Python 3.9 support. | ||||
|     __slots__ = ( | ||||
|         "requirement", | ||||
|         "is_editable", | ||||
|         "comes_from", | ||||
|         "constraint", | ||||
|         "options", | ||||
|         "line_source", | ||||
|     ) | ||||
| 
 | ||||
|     requirement: str | ||||
|     is_editable: bool | ||||
|     comes_from: str | ||||
|     constraint: bool | ||||
|     options: Optional[Dict[str, Any]] | ||||
|     line_source: Optional[str] | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class ParsedLine: | ||||
|     __slots__ = ("filename", "lineno", "args", "opts", "constraint") | ||||
| 
 | ||||
|     filename: str | ||||
|     lineno: int | ||||
|     args: str | ||||
|     opts: Values | ||||
|     constraint: bool | ||||
| 
 | ||||
|     @property | ||||
|     def is_editable(self) -> bool: | ||||
|         return bool(self.opts.editables) | ||||
| 
 | ||||
|     @property | ||||
|     def requirement(self) -> Optional[str]: | ||||
|         if self.args: | ||||
|             return self.args | ||||
|         elif self.is_editable: | ||||
|             # We don't support multiple -e on one line | ||||
|             return self.opts.editables[0] | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| def parse_requirements( | ||||
|     filename: str, | ||||
|     session: "PipSession", | ||||
|     finder: Optional["PackageFinder"] = None, | ||||
|     options: Optional[optparse.Values] = None, | ||||
|     constraint: bool = False, | ||||
| ) -> Generator[ParsedRequirement, None, None]: | ||||
|     """Parse a requirements file and yield ParsedRequirement instances. | ||||
| 
 | ||||
|     :param filename:    Path or url of requirements file. | ||||
|     :param session:     PipSession instance. | ||||
|     :param finder:      Instance of pip.index.PackageFinder. | ||||
|     :param options:     cli options. | ||||
|     :param constraint:  If true, parsing a constraint file rather than | ||||
|         requirements file. | ||||
|     """ | ||||
|     line_parser = get_line_parser(finder) | ||||
|     parser = RequirementsFileParser(session, line_parser) | ||||
| 
 | ||||
|     for parsed_line in parser.parse(filename, constraint): | ||||
|         parsed_req = handle_line( | ||||
|             parsed_line, options=options, finder=finder, session=session | ||||
|         ) | ||||
|         if parsed_req is not None: | ||||
|             yield parsed_req | ||||
| 
 | ||||
| 
 | ||||
| def preprocess(content: str) -> ReqFileLines: | ||||
|     """Split, filter, and join lines, and return a line iterator | ||||
| 
 | ||||
|     :param content: the content of the requirements file | ||||
|     """ | ||||
|     lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1) | ||||
|     lines_enum = join_lines(lines_enum) | ||||
|     lines_enum = ignore_comments(lines_enum) | ||||
|     lines_enum = expand_env_variables(lines_enum) | ||||
|     return lines_enum | ||||
| 
 | ||||
| 
 | ||||
| def handle_requirement_line( | ||||
|     line: ParsedLine, | ||||
|     options: Optional[optparse.Values] = None, | ||||
| ) -> ParsedRequirement: | ||||
|     # preserve for the nested code path | ||||
|     line_comes_from = "{} {} (line {})".format( | ||||
|         "-c" if line.constraint else "-r", | ||||
|         line.filename, | ||||
|         line.lineno, | ||||
|     ) | ||||
| 
 | ||||
|     assert line.requirement is not None | ||||
| 
 | ||||
|     # get the options that apply to requirements | ||||
|     if line.is_editable: | ||||
|         supported_dest = SUPPORTED_OPTIONS_EDITABLE_REQ_DEST | ||||
|     else: | ||||
|         supported_dest = SUPPORTED_OPTIONS_REQ_DEST | ||||
|     req_options = {} | ||||
|     for dest in supported_dest: | ||||
|         if dest in line.opts.__dict__ and line.opts.__dict__[dest]: | ||||
|             req_options[dest] = line.opts.__dict__[dest] | ||||
| 
 | ||||
|     line_source = f"line {line.lineno} of {line.filename}" | ||||
|     return ParsedRequirement( | ||||
|         requirement=line.requirement, | ||||
|         is_editable=line.is_editable, | ||||
|         comes_from=line_comes_from, | ||||
|         constraint=line.constraint, | ||||
|         options=req_options, | ||||
|         line_source=line_source, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def handle_option_line( | ||||
|     opts: Values, | ||||
|     filename: str, | ||||
|     lineno: int, | ||||
|     finder: Optional["PackageFinder"] = None, | ||||
|     options: Optional[optparse.Values] = None, | ||||
|     session: Optional["PipSession"] = None, | ||||
| ) -> None: | ||||
|     if opts.hashes: | ||||
|         logger.warning( | ||||
|             "%s line %s has --hash but no requirement, and will be ignored.", | ||||
|             filename, | ||||
|             lineno, | ||||
|         ) | ||||
| 
 | ||||
|     if options: | ||||
|         # percolate options upward | ||||
|         if opts.require_hashes: | ||||
|             options.require_hashes = opts.require_hashes | ||||
|         if opts.features_enabled: | ||||
|             options.features_enabled.extend( | ||||
|                 f for f in opts.features_enabled if f not in options.features_enabled | ||||
|             ) | ||||
| 
 | ||||
|     # set finder options | ||||
|     if finder: | ||||
|         find_links = finder.find_links | ||||
|         index_urls = finder.index_urls | ||||
|         no_index = finder.search_scope.no_index | ||||
|         if opts.no_index is True: | ||||
|             no_index = True | ||||
|             index_urls = [] | ||||
|         if opts.index_url and not no_index: | ||||
|             index_urls = [opts.index_url] | ||||
|         if opts.extra_index_urls and not no_index: | ||||
|             index_urls.extend(opts.extra_index_urls) | ||||
|         if opts.find_links: | ||||
|             # FIXME: it would be nice to keep track of the source | ||||
|             # of the find_links: support a find-links local path | ||||
|             # relative to a requirements file. | ||||
|             value = opts.find_links[0] | ||||
|             req_dir = os.path.dirname(os.path.abspath(filename)) | ||||
|             relative_to_reqs_file = os.path.join(req_dir, value) | ||||
|             if os.path.exists(relative_to_reqs_file): | ||||
|                 value = relative_to_reqs_file | ||||
|             find_links.append(value) | ||||
| 
 | ||||
|         if session: | ||||
|             # We need to update the auth urls in session | ||||
|             session.update_index_urls(index_urls) | ||||
| 
 | ||||
|         search_scope = SearchScope( | ||||
|             find_links=find_links, | ||||
|             index_urls=index_urls, | ||||
|             no_index=no_index, | ||||
|         ) | ||||
|         finder.search_scope = search_scope | ||||
| 
 | ||||
|         if opts.pre: | ||||
|             finder.set_allow_all_prereleases() | ||||
| 
 | ||||
|         if opts.prefer_binary: | ||||
|             finder.set_prefer_binary() | ||||
| 
 | ||||
|         if session: | ||||
|             for host in opts.trusted_hosts or []: | ||||
|                 source = f"line {lineno} of {filename}" | ||||
|                 session.add_trusted_host(host, source=source) | ||||
| 
 | ||||
| 
 | ||||
| def handle_line( | ||||
|     line: ParsedLine, | ||||
|     options: Optional[optparse.Values] = None, | ||||
|     finder: Optional["PackageFinder"] = None, | ||||
|     session: Optional["PipSession"] = None, | ||||
| ) -> Optional[ParsedRequirement]: | ||||
|     """Handle a single parsed requirements line; This can result in | ||||
|     creating/yielding requirements, or updating the finder. | ||||
| 
 | ||||
|     :param line:        The parsed line to be processed. | ||||
|     :param options:     CLI options. | ||||
|     :param finder:      The finder - updated by non-requirement lines. | ||||
|     :param session:     The session - updated by non-requirement lines. | ||||
| 
 | ||||
|     Returns a ParsedRequirement object if the line is a requirement line, | ||||
|     otherwise returns None. | ||||
| 
 | ||||
|     For lines that contain requirements, the only options that have an effect | ||||
|     are from SUPPORTED_OPTIONS_REQ, and they are scoped to the | ||||
|     requirement. Other options from SUPPORTED_OPTIONS may be present, but are | ||||
|     ignored. | ||||
| 
 | ||||
|     For lines that do not contain requirements, the only options that have an | ||||
|     effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may | ||||
|     be present, but are ignored. These lines may contain multiple options | ||||
|     (although our docs imply only one is supported), and all our parsed and | ||||
|     affect the finder. | ||||
|     """ | ||||
| 
 | ||||
|     if line.requirement is not None: | ||||
|         parsed_req = handle_requirement_line(line, options) | ||||
|         return parsed_req | ||||
|     else: | ||||
|         handle_option_line( | ||||
|             line.opts, | ||||
|             line.filename, | ||||
|             line.lineno, | ||||
|             finder, | ||||
|             options, | ||||
|             session, | ||||
|         ) | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| class RequirementsFileParser: | ||||
|     def __init__( | ||||
|         self, | ||||
|         session: "PipSession", | ||||
|         line_parser: LineParser, | ||||
|     ) -> None: | ||||
|         self._session = session | ||||
|         self._line_parser = line_parser | ||||
| 
 | ||||
|     def parse( | ||||
|         self, filename: str, constraint: bool | ||||
|     ) -> Generator[ParsedLine, None, None]: | ||||
|         """Parse a given file, yielding parsed lines.""" | ||||
|         yield from self._parse_and_recurse( | ||||
|             filename, constraint, [{os.path.abspath(filename): None}] | ||||
|         ) | ||||
| 
 | ||||
|     def _parse_and_recurse( | ||||
|         self, | ||||
|         filename: str, | ||||
|         constraint: bool, | ||||
|         parsed_files_stack: List[Dict[str, Optional[str]]], | ||||
|     ) -> Generator[ParsedLine, None, None]: | ||||
|         for line in self._parse_file(filename, constraint): | ||||
|             if line.requirement is None and ( | ||||
|                 line.opts.requirements or line.opts.constraints | ||||
|             ): | ||||
|                 # parse a nested requirements file | ||||
|                 if line.opts.requirements: | ||||
|                     req_path = line.opts.requirements[0] | ||||
|                     nested_constraint = False | ||||
|                 else: | ||||
|                     req_path = line.opts.constraints[0] | ||||
|                     nested_constraint = True | ||||
| 
 | ||||
|                 # original file is over http | ||||
|                 if SCHEME_RE.search(filename): | ||||
|                     # do a url join so relative paths work | ||||
|                     req_path = urllib.parse.urljoin(filename, req_path) | ||||
|                 # original file and nested file are paths | ||||
|                 elif not SCHEME_RE.search(req_path): | ||||
|                     # do a join so relative paths work | ||||
|                     # and then abspath so that we can identify recursive references | ||||
|                     req_path = os.path.abspath( | ||||
|                         os.path.join( | ||||
|                             os.path.dirname(filename), | ||||
|                             req_path, | ||||
|                         ) | ||||
|                     ) | ||||
|                 parsed_files = parsed_files_stack[0] | ||||
|                 if req_path in parsed_files: | ||||
|                     initial_file = parsed_files[req_path] | ||||
|                     tail = ( | ||||
|                         f" and again in {initial_file}" | ||||
|                         if initial_file is not None | ||||
|                         else "" | ||||
|                     ) | ||||
|                     raise RequirementsFileParseError( | ||||
|                         f"{req_path} recursively references itself in {filename}{tail}" | ||||
|                     ) | ||||
|                 # Keeping a track where was each file first included in | ||||
|                 new_parsed_files = parsed_files.copy() | ||||
|                 new_parsed_files[req_path] = filename | ||||
|                 yield from self._parse_and_recurse( | ||||
|                     req_path, nested_constraint, [new_parsed_files, *parsed_files_stack] | ||||
|                 ) | ||||
|             else: | ||||
|                 yield line | ||||
| 
 | ||||
|     def _parse_file( | ||||
|         self, filename: str, constraint: bool | ||||
|     ) -> Generator[ParsedLine, None, None]: | ||||
|         _, content = get_file_content(filename, self._session) | ||||
| 
 | ||||
|         lines_enum = preprocess(content) | ||||
| 
 | ||||
|         for line_number, line in lines_enum: | ||||
|             try: | ||||
|                 args_str, opts = self._line_parser(line) | ||||
|             except OptionParsingError as e: | ||||
|                 # add offending line | ||||
|                 msg = f"Invalid requirement: {line}\n{e.msg}" | ||||
|                 raise RequirementsFileParseError(msg) | ||||
| 
 | ||||
|             yield ParsedLine( | ||||
|                 filename, | ||||
|                 line_number, | ||||
|                 args_str, | ||||
|                 opts, | ||||
|                 constraint, | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
| def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser: | ||||
|     def parse_line(line: str) -> Tuple[str, Values]: | ||||
|         # Build new parser for each line since it accumulates appendable | ||||
|         # options. | ||||
|         parser = build_parser() | ||||
|         defaults = parser.get_default_values() | ||||
|         defaults.index_url = None | ||||
|         if finder: | ||||
|             defaults.format_control = finder.format_control | ||||
| 
 | ||||
|         args_str, options_str = break_args_options(line) | ||||
| 
 | ||||
|         try: | ||||
|             options = shlex.split(options_str) | ||||
|         except ValueError as e: | ||||
|             raise OptionParsingError(f"Could not split options: {options_str}") from e | ||||
| 
 | ||||
|         opts, _ = parser.parse_args(options, defaults) | ||||
| 
 | ||||
|         return args_str, opts | ||||
| 
 | ||||
|     return parse_line | ||||
| 
 | ||||
| 
 | ||||
| def break_args_options(line: str) -> Tuple[str, str]: | ||||
|     """Break up the line into an args and options string.  We only want to shlex | ||||
|     (and then optparse) the options, not the args.  args can contain markers | ||||
|     which are corrupted by shlex. | ||||
|     """ | ||||
|     tokens = line.split(" ") | ||||
|     args = [] | ||||
|     options = tokens[:] | ||||
|     for token in tokens: | ||||
|         if token.startswith("-") or token.startswith("--"): | ||||
|             break | ||||
|         else: | ||||
|             args.append(token) | ||||
|             options.pop(0) | ||||
|     return " ".join(args), " ".join(options) | ||||
| 
 | ||||
| 
 | ||||
| class OptionParsingError(Exception): | ||||
|     def __init__(self, msg: str) -> None: | ||||
|         self.msg = msg | ||||
| 
 | ||||
| 
 | ||||
| def build_parser() -> optparse.OptionParser: | ||||
|     """ | ||||
|     Return a parser for parsing requirement lines | ||||
|     """ | ||||
|     parser = optparse.OptionParser(add_help_option=False) | ||||
| 
 | ||||
|     option_factories = SUPPORTED_OPTIONS + SUPPORTED_OPTIONS_REQ | ||||
|     for option_factory in option_factories: | ||||
|         option = option_factory() | ||||
|         parser.add_option(option) | ||||
| 
 | ||||
|     # By default optparse sys.exits on parsing errors. We want to wrap | ||||
|     # that in our own exception. | ||||
|     def parser_exit(self: Any, msg: str) -> "NoReturn": | ||||
|         raise OptionParsingError(msg) | ||||
| 
 | ||||
|     # NOTE: mypy disallows assigning to a method | ||||
|     #       https://github.com/python/mypy/issues/2427 | ||||
|     parser.exit = parser_exit  # type: ignore | ||||
| 
 | ||||
|     return parser | ||||
| 
 | ||||
| 
 | ||||
| def join_lines(lines_enum: ReqFileLines) -> ReqFileLines: | ||||
|     """Joins a line ending in '\' with the previous line (except when following | ||||
|     comments).  The joined line takes on the index of the first line. | ||||
|     """ | ||||
|     primary_line_number = None | ||||
|     new_line: List[str] = [] | ||||
|     for line_number, line in lines_enum: | ||||
|         if not line.endswith("\\") or COMMENT_RE.match(line): | ||||
|             if COMMENT_RE.match(line): | ||||
|                 # this ensures comments are always matched later | ||||
|                 line = " " + line | ||||
|             if new_line: | ||||
|                 new_line.append(line) | ||||
|                 assert primary_line_number is not None | ||||
|                 yield primary_line_number, "".join(new_line) | ||||
|                 new_line = [] | ||||
|             else: | ||||
|                 yield line_number, line | ||||
|         else: | ||||
|             if not new_line: | ||||
|                 primary_line_number = line_number | ||||
|             new_line.append(line.strip("\\")) | ||||
| 
 | ||||
|     # last line contains \ | ||||
|     if new_line: | ||||
|         assert primary_line_number is not None | ||||
|         yield primary_line_number, "".join(new_line) | ||||
| 
 | ||||
|     # TODO: handle space after '\'. | ||||
| 
 | ||||
| 
 | ||||
| def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines: | ||||
|     """ | ||||
|     Strips comments and filter empty lines. | ||||
|     """ | ||||
|     for line_number, line in lines_enum: | ||||
|         line = COMMENT_RE.sub("", line) | ||||
|         line = line.strip() | ||||
|         if line: | ||||
|             yield line_number, line | ||||
| 
 | ||||
| 
 | ||||
| def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines: | ||||
|     """Replace all environment variables that can be retrieved via `os.getenv`. | ||||
| 
 | ||||
|     The only allowed format for environment variables defined in the | ||||
|     requirement file is `${MY_VARIABLE_1}` to ensure two things: | ||||
| 
 | ||||
|     1. Strings that contain a `$` aren't accidentally (partially) expanded. | ||||
|     2. Ensure consistency across platforms for requirement files. | ||||
| 
 | ||||
|     These points are the result of a discussion on the `github pull | ||||
|     request #3514 <https://github.com/pypa/pip/pull/3514>`_. | ||||
| 
 | ||||
|     Valid characters in variable names follow the `POSIX standard | ||||
|     <http://pubs.opengroup.org/onlinepubs/9699919799/>`_ and are limited | ||||
|     to uppercase letter, digits and the `_` (underscore). | ||||
|     """ | ||||
|     for line_number, line in lines_enum: | ||||
|         for env_var, var_name in ENV_VAR_RE.findall(line): | ||||
|             value = os.getenv(var_name) | ||||
|             if not value: | ||||
|                 continue | ||||
| 
 | ||||
|             line = line.replace(env_var, value) | ||||
| 
 | ||||
|         yield line_number, line | ||||
| 
 | ||||
| 
 | ||||
| def get_file_content(url: str, session: "PipSession") -> Tuple[str, str]: | ||||
|     """Gets the content of a file; it may be a filename, file: URL, or | ||||
|     http: URL.  Returns (location, content).  Content is unicode. | ||||
|     Respects # -*- coding: declarations on the retrieved files. | ||||
| 
 | ||||
|     :param url:         File path or url. | ||||
|     :param session:     PipSession instance. | ||||
|     """ | ||||
|     scheme = urllib.parse.urlsplit(url).scheme | ||||
|     # Pip has special support for file:// URLs (LocalFSAdapter). | ||||
|     if scheme in ["http", "https", "file"]: | ||||
|         # Delay importing heavy network modules until absolutely necessary. | ||||
|         from pip._internal.network.utils import raise_for_status | ||||
| 
 | ||||
|         resp = session.get(url) | ||||
|         raise_for_status(resp) | ||||
|         return resp.url, resp.text | ||||
| 
 | ||||
|     # Assume this is a bare path. | ||||
|     try: | ||||
|         with open(url, "rb") as f: | ||||
|             raw_content = f.read() | ||||
|     except OSError as exc: | ||||
|         raise InstallationError(f"Could not open requirements file: {exc}") | ||||
| 
 | ||||
|     content = _decode_req_file(raw_content, url) | ||||
| 
 | ||||
|     return url, content | ||||
| 
 | ||||
| 
 | ||||
| def _decode_req_file(data: bytes, url: str) -> str: | ||||
|     for bom, encoding in BOMS: | ||||
|         if data.startswith(bom): | ||||
|             return data[len(bom) :].decode(encoding) | ||||
| 
 | ||||
|     for line in data.split(b"\n")[:2]: | ||||
|         if line[0:1] == b"#": | ||||
|             result = PEP263_ENCODING_RE.search(line) | ||||
|             if result is not None: | ||||
|                 encoding = result.groups()[0].decode("ascii") | ||||
|                 return data.decode(encoding) | ||||
| 
 | ||||
|     try: | ||||
|         return data.decode(DEFAULT_ENCODING) | ||||
|     except UnicodeDecodeError: | ||||
|         locale_encoding = locale.getpreferredencoding(False) or sys.getdefaultencoding() | ||||
|         logging.warning( | ||||
|             "unable to decode data from %s with default encoding %s, " | ||||
|             "falling back to encoding from locale: %s. " | ||||
|             "If this is intentional you should specify the encoding with a " | ||||
|             "PEP-263 style comment, e.g. '# -*- coding: %s -*-'", | ||||
|             url, | ||||
|             DEFAULT_ENCODING, | ||||
|             locale_encoding, | ||||
|             locale_encoding, | ||||
|         ) | ||||
|         return data.decode(locale_encoding) | ||||
|  | @ -0,0 +1,934 @@ | |||
| import functools | ||||
| import logging | ||||
| import os | ||||
| import shutil | ||||
| import sys | ||||
| import uuid | ||||
| import zipfile | ||||
| from optparse import Values | ||||
| from pathlib import Path | ||||
| from typing import Any, Collection, Dict, Iterable, List, Optional, Sequence, Union | ||||
| 
 | ||||
| from pip._vendor.packaging.markers import Marker | ||||
| from pip._vendor.packaging.requirements import Requirement | ||||
| from pip._vendor.packaging.specifiers import SpecifierSet | ||||
| from pip._vendor.packaging.utils import canonicalize_name | ||||
| from pip._vendor.packaging.version import Version | ||||
| from pip._vendor.packaging.version import parse as parse_version | ||||
| from pip._vendor.pyproject_hooks import BuildBackendHookCaller | ||||
| 
 | ||||
| from pip._internal.build_env import BuildEnvironment, NoOpBuildEnvironment | ||||
| from pip._internal.exceptions import InstallationError, PreviousBuildDirError | ||||
| from pip._internal.locations import get_scheme | ||||
| from pip._internal.metadata import ( | ||||
|     BaseDistribution, | ||||
|     get_default_environment, | ||||
|     get_directory_distribution, | ||||
|     get_wheel_distribution, | ||||
| ) | ||||
| from pip._internal.metadata.base import FilesystemWheel | ||||
| from pip._internal.models.direct_url import DirectUrl | ||||
| from pip._internal.models.link import Link | ||||
| from pip._internal.operations.build.metadata import generate_metadata | ||||
| from pip._internal.operations.build.metadata_editable import generate_editable_metadata | ||||
| from pip._internal.operations.build.metadata_legacy import ( | ||||
|     generate_metadata as generate_metadata_legacy, | ||||
| ) | ||||
| from pip._internal.operations.install.editable_legacy import ( | ||||
|     install_editable as install_editable_legacy, | ||||
| ) | ||||
| from pip._internal.operations.install.wheel import install_wheel | ||||
| from pip._internal.pyproject import load_pyproject_toml, make_pyproject_path | ||||
| from pip._internal.req.req_uninstall import UninstallPathSet | ||||
| from pip._internal.utils.deprecation import deprecated | ||||
| from pip._internal.utils.hashes import Hashes | ||||
| from pip._internal.utils.misc import ( | ||||
|     ConfiguredBuildBackendHookCaller, | ||||
|     ask_path_exists, | ||||
|     backup_dir, | ||||
|     display_path, | ||||
|     hide_url, | ||||
|     is_installable_dir, | ||||
|     redact_auth_from_requirement, | ||||
|     redact_auth_from_url, | ||||
| ) | ||||
| from pip._internal.utils.packaging import get_requirement | ||||
| from pip._internal.utils.subprocess import runner_with_spinner_message | ||||
| from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds | ||||
| from pip._internal.utils.unpacking import unpack_file | ||||
| from pip._internal.utils.virtualenv import running_under_virtualenv | ||||
| from pip._internal.vcs import vcs | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class InstallRequirement: | ||||
|     """ | ||||
|     Represents something that may be installed later on, may have information | ||||
|     about where to fetch the relevant requirement and also contains logic for | ||||
|     installing the said requirement. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         req: Optional[Requirement], | ||||
|         comes_from: Optional[Union[str, "InstallRequirement"]], | ||||
|         editable: bool = False, | ||||
|         link: Optional[Link] = None, | ||||
|         markers: Optional[Marker] = None, | ||||
|         use_pep517: Optional[bool] = None, | ||||
|         isolated: bool = False, | ||||
|         *, | ||||
|         global_options: Optional[List[str]] = None, | ||||
|         hash_options: Optional[Dict[str, List[str]]] = None, | ||||
|         config_settings: Optional[Dict[str, Union[str, List[str]]]] = None, | ||||
|         constraint: bool = False, | ||||
|         extras: Collection[str] = (), | ||||
|         user_supplied: bool = False, | ||||
|         permit_editable_wheels: bool = False, | ||||
|     ) -> None: | ||||
|         assert req is None or isinstance(req, Requirement), req | ||||
|         self.req = req | ||||
|         self.comes_from = comes_from | ||||
|         self.constraint = constraint | ||||
|         self.editable = editable | ||||
|         self.permit_editable_wheels = permit_editable_wheels | ||||
| 
 | ||||
|         # source_dir is the local directory where the linked requirement is | ||||
|         # located, or unpacked. In case unpacking is needed, creating and | ||||
|         # populating source_dir is done by the RequirementPreparer. Note this | ||||
|         # is not necessarily the directory where pyproject.toml or setup.py is | ||||
|         # located - that one is obtained via unpacked_source_directory. | ||||
|         self.source_dir: Optional[str] = None | ||||
|         if self.editable: | ||||
|             assert link | ||||
|             if link.is_file: | ||||
|                 self.source_dir = os.path.normpath(os.path.abspath(link.file_path)) | ||||
| 
 | ||||
|         # original_link is the direct URL that was provided by the user for the | ||||
|         # requirement, either directly or via a constraints file. | ||||
|         if link is None and req and req.url: | ||||
|             # PEP 508 URL requirement | ||||
|             link = Link(req.url) | ||||
|         self.link = self.original_link = link | ||||
| 
 | ||||
|         # When this InstallRequirement is a wheel obtained from the cache of locally | ||||
|         # built wheels, this is the source link corresponding to the cache entry, which | ||||
|         # was used to download and build the cached wheel. | ||||
|         self.cached_wheel_source_link: Optional[Link] = None | ||||
| 
 | ||||
|         # Information about the location of the artifact that was downloaded . This | ||||
|         # property is guaranteed to be set in resolver results. | ||||
|         self.download_info: Optional[DirectUrl] = None | ||||
| 
 | ||||
|         # Path to any downloaded or already-existing package. | ||||
|         self.local_file_path: Optional[str] = None | ||||
|         if self.link and self.link.is_file: | ||||
|             self.local_file_path = self.link.file_path | ||||
| 
 | ||||
|         if extras: | ||||
|             self.extras = extras | ||||
|         elif req: | ||||
|             self.extras = req.extras | ||||
|         else: | ||||
|             self.extras = set() | ||||
|         if markers is None and req: | ||||
|             markers = req.marker | ||||
|         self.markers = markers | ||||
| 
 | ||||
|         # This holds the Distribution object if this requirement is already installed. | ||||
|         self.satisfied_by: Optional[BaseDistribution] = None | ||||
|         # Whether the installation process should try to uninstall an existing | ||||
|         # distribution before installing this requirement. | ||||
|         self.should_reinstall = False | ||||
|         # Temporary build location | ||||
|         self._temp_build_dir: Optional[TempDirectory] = None | ||||
|         # Set to True after successful installation | ||||
|         self.install_succeeded: Optional[bool] = None | ||||
|         # Supplied options | ||||
|         self.global_options = global_options if global_options else [] | ||||
|         self.hash_options = hash_options if hash_options else {} | ||||
|         self.config_settings = config_settings | ||||
|         # Set to True after successful preparation of this requirement | ||||
|         self.prepared = False | ||||
|         # User supplied requirement are explicitly requested for installation | ||||
|         # by the user via CLI arguments or requirements files, as opposed to, | ||||
|         # e.g. dependencies, extras or constraints. | ||||
|         self.user_supplied = user_supplied | ||||
| 
 | ||||
|         self.isolated = isolated | ||||
|         self.build_env: BuildEnvironment = NoOpBuildEnvironment() | ||||
| 
 | ||||
|         # For PEP 517, the directory where we request the project metadata | ||||
|         # gets stored. We need this to pass to build_wheel, so the backend | ||||
|         # can ensure that the wheel matches the metadata (see the PEP for | ||||
|         # details). | ||||
|         self.metadata_directory: Optional[str] = None | ||||
| 
 | ||||
|         # The static build requirements (from pyproject.toml) | ||||
|         self.pyproject_requires: Optional[List[str]] = None | ||||
| 
 | ||||
|         # Build requirements that we will check are available | ||||
|         self.requirements_to_check: List[str] = [] | ||||
| 
 | ||||
|         # The PEP 517 backend we should use to build the project | ||||
|         self.pep517_backend: Optional[BuildBackendHookCaller] = None | ||||
| 
 | ||||
|         # Are we using PEP 517 for this requirement? | ||||
|         # After pyproject.toml has been loaded, the only valid values are True | ||||
|         # and False. Before loading, None is valid (meaning "use the default"). | ||||
|         # Setting an explicit value before loading pyproject.toml is supported, | ||||
|         # but after loading this flag should be treated as read only. | ||||
|         self.use_pep517 = use_pep517 | ||||
| 
 | ||||
|         # If config settings are provided, enforce PEP 517. | ||||
|         if self.config_settings: | ||||
|             if self.use_pep517 is False: | ||||
|                 logger.warning( | ||||
|                     "--no-use-pep517 ignored for %s " | ||||
|                     "because --config-settings are specified.", | ||||
|                     self, | ||||
|                 ) | ||||
|             self.use_pep517 = True | ||||
| 
 | ||||
|         # This requirement needs more preparation before it can be built | ||||
|         self.needs_more_preparation = False | ||||
| 
 | ||||
|         # This requirement needs to be unpacked before it can be installed. | ||||
|         self._archive_source: Optional[Path] = None | ||||
| 
 | ||||
|     def __str__(self) -> str: | ||||
|         if self.req: | ||||
|             s = redact_auth_from_requirement(self.req) | ||||
|             if self.link: | ||||
|                 s += f" from {redact_auth_from_url(self.link.url)}" | ||||
|         elif self.link: | ||||
|             s = redact_auth_from_url(self.link.url) | ||||
|         else: | ||||
|             s = "<InstallRequirement>" | ||||
|         if self.satisfied_by is not None: | ||||
|             if self.satisfied_by.location is not None: | ||||
|                 location = display_path(self.satisfied_by.location) | ||||
|             else: | ||||
|                 location = "<memory>" | ||||
|             s += f" in {location}" | ||||
|         if self.comes_from: | ||||
|             if isinstance(self.comes_from, str): | ||||
|                 comes_from: Optional[str] = self.comes_from | ||||
|             else: | ||||
|                 comes_from = self.comes_from.from_path() | ||||
|             if comes_from: | ||||
|                 s += f" (from {comes_from})" | ||||
|         return s | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return ( | ||||
|             f"<{self.__class__.__name__} object: " | ||||
|             f"{str(self)} editable={self.editable!r}>" | ||||
|         ) | ||||
| 
 | ||||
|     def format_debug(self) -> str: | ||||
|         """An un-tested helper for getting state, for debugging.""" | ||||
|         attributes = vars(self) | ||||
|         names = sorted(attributes) | ||||
| 
 | ||||
|         state = (f"{attr}={attributes[attr]!r}" for attr in sorted(names)) | ||||
|         return "<{name} object: {{{state}}}>".format( | ||||
|             name=self.__class__.__name__, | ||||
|             state=", ".join(state), | ||||
|         ) | ||||
| 
 | ||||
|     # Things that are valid for all kinds of requirements? | ||||
|     @property | ||||
|     def name(self) -> Optional[str]: | ||||
|         if self.req is None: | ||||
|             return None | ||||
|         return self.req.name | ||||
| 
 | ||||
|     @functools.cached_property | ||||
|     def supports_pyproject_editable(self) -> bool: | ||||
|         if not self.use_pep517: | ||||
|             return False | ||||
|         assert self.pep517_backend | ||||
|         with self.build_env: | ||||
|             runner = runner_with_spinner_message( | ||||
|                 "Checking if build backend supports build_editable" | ||||
|             ) | ||||
|             with self.pep517_backend.subprocess_runner(runner): | ||||
|                 return "build_editable" in self.pep517_backend._supported_features() | ||||
| 
 | ||||
|     @property | ||||
|     def specifier(self) -> SpecifierSet: | ||||
|         assert self.req is not None | ||||
|         return self.req.specifier | ||||
| 
 | ||||
|     @property | ||||
|     def is_direct(self) -> bool: | ||||
|         """Whether this requirement was specified as a direct URL.""" | ||||
|         return self.original_link is not None | ||||
| 
 | ||||
|     @property | ||||
|     def is_pinned(self) -> bool: | ||||
|         """Return whether I am pinned to an exact version. | ||||
| 
 | ||||
|         For example, some-package==1.2 is pinned; some-package>1.2 is not. | ||||
|         """ | ||||
|         assert self.req is not None | ||||
|         specifiers = self.req.specifier | ||||
|         return len(specifiers) == 1 and next(iter(specifiers)).operator in {"==", "==="} | ||||
| 
 | ||||
|     def match_markers(self, extras_requested: Optional[Iterable[str]] = None) -> bool: | ||||
|         if not extras_requested: | ||||
|             # Provide an extra to safely evaluate the markers | ||||
|             # without matching any extra | ||||
|             extras_requested = ("",) | ||||
|         if self.markers is not None: | ||||
|             return any( | ||||
|                 self.markers.evaluate({"extra": extra}) for extra in extras_requested | ||||
|             ) | ||||
|         else: | ||||
|             return True | ||||
| 
 | ||||
|     @property | ||||
|     def has_hash_options(self) -> bool: | ||||
|         """Return whether any known-good hashes are specified as options. | ||||
| 
 | ||||
|         These activate --require-hashes mode; hashes specified as part of a | ||||
|         URL do not. | ||||
| 
 | ||||
|         """ | ||||
|         return bool(self.hash_options) | ||||
| 
 | ||||
|     def hashes(self, trust_internet: bool = True) -> Hashes: | ||||
|         """Return a hash-comparer that considers my option- and URL-based | ||||
|         hashes to be known-good. | ||||
| 
 | ||||
|         Hashes in URLs--ones embedded in the requirements file, not ones | ||||
|         downloaded from an index server--are almost peers with ones from | ||||
|         flags. They satisfy --require-hashes (whether it was implicitly or | ||||
|         explicitly activated) but do not activate it. md5 and sha224 are not | ||||
|         allowed in flags, which should nudge people toward good algos. We | ||||
|         always OR all hashes together, even ones from URLs. | ||||
| 
 | ||||
|         :param trust_internet: Whether to trust URL-based (#md5=...) hashes | ||||
|             downloaded from the internet, as by populate_link() | ||||
| 
 | ||||
|         """ | ||||
|         good_hashes = self.hash_options.copy() | ||||
|         if trust_internet: | ||||
|             link = self.link | ||||
|         elif self.is_direct and self.user_supplied: | ||||
|             link = self.original_link | ||||
|         else: | ||||
|             link = None | ||||
|         if link and link.hash: | ||||
|             assert link.hash_name is not None | ||||
|             good_hashes.setdefault(link.hash_name, []).append(link.hash) | ||||
|         return Hashes(good_hashes) | ||||
| 
 | ||||
|     def from_path(self) -> Optional[str]: | ||||
|         """Format a nice indicator to show where this "comes from" """ | ||||
|         if self.req is None: | ||||
|             return None | ||||
|         s = str(self.req) | ||||
|         if self.comes_from: | ||||
|             comes_from: Optional[str] | ||||
|             if isinstance(self.comes_from, str): | ||||
|                 comes_from = self.comes_from | ||||
|             else: | ||||
|                 comes_from = self.comes_from.from_path() | ||||
|             if comes_from: | ||||
|                 s += "->" + comes_from | ||||
|         return s | ||||
| 
 | ||||
|     def ensure_build_location( | ||||
|         self, build_dir: str, autodelete: bool, parallel_builds: bool | ||||
|     ) -> str: | ||||
|         assert build_dir is not None | ||||
|         if self._temp_build_dir is not None: | ||||
|             assert self._temp_build_dir.path | ||||
|             return self._temp_build_dir.path | ||||
|         if self.req is None: | ||||
|             # Some systems have /tmp as a symlink which confuses custom | ||||
|             # builds (such as numpy). Thus, we ensure that the real path | ||||
|             # is returned. | ||||
|             self._temp_build_dir = TempDirectory( | ||||
|                 kind=tempdir_kinds.REQ_BUILD, globally_managed=True | ||||
|             ) | ||||
| 
 | ||||
|             return self._temp_build_dir.path | ||||
| 
 | ||||
|         # This is the only remaining place where we manually determine the path | ||||
|         # for the temporary directory. It is only needed for editables where | ||||
|         # it is the value of the --src option. | ||||
| 
 | ||||
|         # When parallel builds are enabled, add a UUID to the build directory | ||||
|         # name so multiple builds do not interfere with each other. | ||||
|         dir_name: str = canonicalize_name(self.req.name) | ||||
|         if parallel_builds: | ||||
|             dir_name = f"{dir_name}_{uuid.uuid4().hex}" | ||||
| 
 | ||||
|         # FIXME: Is there a better place to create the build_dir? (hg and bzr | ||||
|         # need this) | ||||
|         if not os.path.exists(build_dir): | ||||
|             logger.debug("Creating directory %s", build_dir) | ||||
|             os.makedirs(build_dir) | ||||
|         actual_build_dir = os.path.join(build_dir, dir_name) | ||||
|         # `None` indicates that we respect the globally-configured deletion | ||||
|         # settings, which is what we actually want when auto-deleting. | ||||
|         delete_arg = None if autodelete else False | ||||
|         return TempDirectory( | ||||
|             path=actual_build_dir, | ||||
|             delete=delete_arg, | ||||
|             kind=tempdir_kinds.REQ_BUILD, | ||||
|             globally_managed=True, | ||||
|         ).path | ||||
| 
 | ||||
|     def _set_requirement(self) -> None: | ||||
|         """Set requirement after generating metadata.""" | ||||
|         assert self.req is None | ||||
|         assert self.metadata is not None | ||||
|         assert self.source_dir is not None | ||||
| 
 | ||||
|         # Construct a Requirement object from the generated metadata | ||||
|         if isinstance(parse_version(self.metadata["Version"]), Version): | ||||
|             op = "==" | ||||
|         else: | ||||
|             op = "===" | ||||
| 
 | ||||
|         self.req = get_requirement( | ||||
|             "".join( | ||||
|                 [ | ||||
|                     self.metadata["Name"], | ||||
|                     op, | ||||
|                     self.metadata["Version"], | ||||
|                 ] | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|     def warn_on_mismatching_name(self) -> None: | ||||
|         assert self.req is not None | ||||
|         metadata_name = canonicalize_name(self.metadata["Name"]) | ||||
|         if canonicalize_name(self.req.name) == metadata_name: | ||||
|             # Everything is fine. | ||||
|             return | ||||
| 
 | ||||
|         # If we're here, there's a mismatch. Log a warning about it. | ||||
|         logger.warning( | ||||
|             "Generating metadata for package %s " | ||||
|             "produced metadata for project name %s. Fix your " | ||||
|             "#egg=%s fragments.", | ||||
|             self.name, | ||||
|             metadata_name, | ||||
|             self.name, | ||||
|         ) | ||||
|         self.req = get_requirement(metadata_name) | ||||
| 
 | ||||
|     def check_if_exists(self, use_user_site: bool) -> None: | ||||
|         """Find an installed distribution that satisfies or conflicts | ||||
|         with this requirement, and set self.satisfied_by or | ||||
|         self.should_reinstall appropriately. | ||||
|         """ | ||||
|         if self.req is None: | ||||
|             return | ||||
|         existing_dist = get_default_environment().get_distribution(self.req.name) | ||||
|         if not existing_dist: | ||||
|             return | ||||
| 
 | ||||
|         version_compatible = self.req.specifier.contains( | ||||
|             existing_dist.version, | ||||
|             prereleases=True, | ||||
|         ) | ||||
|         if not version_compatible: | ||||
|             self.satisfied_by = None | ||||
|             if use_user_site: | ||||
|                 if existing_dist.in_usersite: | ||||
|                     self.should_reinstall = True | ||||
|                 elif running_under_virtualenv() and existing_dist.in_site_packages: | ||||
|                     raise InstallationError( | ||||
|                         f"Will not install to the user site because it will " | ||||
|                         f"lack sys.path precedence to {existing_dist.raw_name} " | ||||
|                         f"in {existing_dist.location}" | ||||
|                     ) | ||||
|             else: | ||||
|                 self.should_reinstall = True | ||||
|         else: | ||||
|             if self.editable: | ||||
|                 self.should_reinstall = True | ||||
|                 # when installing editables, nothing pre-existing should ever | ||||
|                 # satisfy | ||||
|                 self.satisfied_by = None | ||||
|             else: | ||||
|                 self.satisfied_by = existing_dist | ||||
| 
 | ||||
|     # Things valid for wheels | ||||
|     @property | ||||
|     def is_wheel(self) -> bool: | ||||
|         if not self.link: | ||||
|             return False | ||||
|         return self.link.is_wheel | ||||
| 
 | ||||
|     @property | ||||
|     def is_wheel_from_cache(self) -> bool: | ||||
|         # When True, it means that this InstallRequirement is a local wheel file in the | ||||
|         # cache of locally built wheels. | ||||
|         return self.cached_wheel_source_link is not None | ||||
| 
 | ||||
|     # Things valid for sdists | ||||
|     @property | ||||
|     def unpacked_source_directory(self) -> str: | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         return os.path.join( | ||||
|             self.source_dir, self.link and self.link.subdirectory_fragment or "" | ||||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def setup_py_path(self) -> str: | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         setup_py = os.path.join(self.unpacked_source_directory, "setup.py") | ||||
| 
 | ||||
|         return setup_py | ||||
| 
 | ||||
|     @property | ||||
|     def setup_cfg_path(self) -> str: | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         setup_cfg = os.path.join(self.unpacked_source_directory, "setup.cfg") | ||||
| 
 | ||||
|         return setup_cfg | ||||
| 
 | ||||
|     @property | ||||
|     def pyproject_toml_path(self) -> str: | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         return make_pyproject_path(self.unpacked_source_directory) | ||||
| 
 | ||||
|     def load_pyproject_toml(self) -> None: | ||||
|         """Load the pyproject.toml file. | ||||
| 
 | ||||
|         After calling this routine, all of the attributes related to PEP 517 | ||||
|         processing for this requirement have been set. In particular, the | ||||
|         use_pep517 attribute can be used to determine whether we should | ||||
|         follow the PEP 517 or legacy (setup.py) code path. | ||||
|         """ | ||||
|         pyproject_toml_data = load_pyproject_toml( | ||||
|             self.use_pep517, self.pyproject_toml_path, self.setup_py_path, str(self) | ||||
|         ) | ||||
| 
 | ||||
|         if pyproject_toml_data is None: | ||||
|             assert not self.config_settings | ||||
|             self.use_pep517 = False | ||||
|             return | ||||
| 
 | ||||
|         self.use_pep517 = True | ||||
|         requires, backend, check, backend_path = pyproject_toml_data | ||||
|         self.requirements_to_check = check | ||||
|         self.pyproject_requires = requires | ||||
|         self.pep517_backend = ConfiguredBuildBackendHookCaller( | ||||
|             self, | ||||
|             self.unpacked_source_directory, | ||||
|             backend, | ||||
|             backend_path=backend_path, | ||||
|         ) | ||||
| 
 | ||||
|     def isolated_editable_sanity_check(self) -> None: | ||||
|         """Check that an editable requirement if valid for use with PEP 517/518. | ||||
| 
 | ||||
|         This verifies that an editable that has a pyproject.toml either supports PEP 660 | ||||
|         or as a setup.py or a setup.cfg | ||||
|         """ | ||||
|         if ( | ||||
|             self.editable | ||||
|             and self.use_pep517 | ||||
|             and not self.supports_pyproject_editable | ||||
|             and not os.path.isfile(self.setup_py_path) | ||||
|             and not os.path.isfile(self.setup_cfg_path) | ||||
|         ): | ||||
|             raise InstallationError( | ||||
|                 f"Project {self} has a 'pyproject.toml' and its build " | ||||
|                 f"backend is missing the 'build_editable' hook. Since it does not " | ||||
|                 f"have a 'setup.py' nor a 'setup.cfg', " | ||||
|                 f"it cannot be installed in editable mode. " | ||||
|                 f"Consider using a build backend that supports PEP 660." | ||||
|             ) | ||||
| 
 | ||||
|     def prepare_metadata(self) -> None: | ||||
|         """Ensure that project metadata is available. | ||||
| 
 | ||||
|         Under PEP 517 and PEP 660, call the backend hook to prepare the metadata. | ||||
|         Under legacy processing, call setup.py egg-info. | ||||
|         """ | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         details = self.name or f"from {self.link}" | ||||
| 
 | ||||
|         if self.use_pep517: | ||||
|             assert self.pep517_backend is not None | ||||
|             if ( | ||||
|                 self.editable | ||||
|                 and self.permit_editable_wheels | ||||
|                 and self.supports_pyproject_editable | ||||
|             ): | ||||
|                 self.metadata_directory = generate_editable_metadata( | ||||
|                     build_env=self.build_env, | ||||
|                     backend=self.pep517_backend, | ||||
|                     details=details, | ||||
|                 ) | ||||
|             else: | ||||
|                 self.metadata_directory = generate_metadata( | ||||
|                     build_env=self.build_env, | ||||
|                     backend=self.pep517_backend, | ||||
|                     details=details, | ||||
|                 ) | ||||
|         else: | ||||
|             self.metadata_directory = generate_metadata_legacy( | ||||
|                 build_env=self.build_env, | ||||
|                 setup_py_path=self.setup_py_path, | ||||
|                 source_dir=self.unpacked_source_directory, | ||||
|                 isolated=self.isolated, | ||||
|                 details=details, | ||||
|             ) | ||||
| 
 | ||||
|         # Act on the newly generated metadata, based on the name and version. | ||||
|         if not self.name: | ||||
|             self._set_requirement() | ||||
|         else: | ||||
|             self.warn_on_mismatching_name() | ||||
| 
 | ||||
|         self.assert_source_matches_version() | ||||
| 
 | ||||
|     @property | ||||
|     def metadata(self) -> Any: | ||||
|         if not hasattr(self, "_metadata"): | ||||
|             self._metadata = self.get_dist().metadata | ||||
| 
 | ||||
|         return self._metadata | ||||
| 
 | ||||
|     def get_dist(self) -> BaseDistribution: | ||||
|         if self.metadata_directory: | ||||
|             return get_directory_distribution(self.metadata_directory) | ||||
|         elif self.local_file_path and self.is_wheel: | ||||
|             assert self.req is not None | ||||
|             return get_wheel_distribution( | ||||
|                 FilesystemWheel(self.local_file_path), | ||||
|                 canonicalize_name(self.req.name), | ||||
|             ) | ||||
|         raise AssertionError( | ||||
|             f"InstallRequirement {self} has no metadata directory and no wheel: " | ||||
|             f"can't make a distribution." | ||||
|         ) | ||||
| 
 | ||||
|     def assert_source_matches_version(self) -> None: | ||||
|         assert self.source_dir, f"No source dir for {self}" | ||||
|         version = self.metadata["version"] | ||||
|         if self.req and self.req.specifier and version not in self.req.specifier: | ||||
|             logger.warning( | ||||
|                 "Requested %s, but installing version %s", | ||||
|                 self, | ||||
|                 version, | ||||
|             ) | ||||
|         else: | ||||
|             logger.debug( | ||||
|                 "Source in %s has version %s, which satisfies requirement %s", | ||||
|                 display_path(self.source_dir), | ||||
|                 version, | ||||
|                 self, | ||||
|             ) | ||||
| 
 | ||||
|     # For both source distributions and editables | ||||
|     def ensure_has_source_dir( | ||||
|         self, | ||||
|         parent_dir: str, | ||||
|         autodelete: bool = False, | ||||
|         parallel_builds: bool = False, | ||||
|     ) -> None: | ||||
|         """Ensure that a source_dir is set. | ||||
| 
 | ||||
|         This will create a temporary build dir if the name of the requirement | ||||
|         isn't known yet. | ||||
| 
 | ||||
|         :param parent_dir: The ideal pip parent_dir for the source_dir. | ||||
|             Generally src_dir for editables and build_dir for sdists. | ||||
|         :return: self.source_dir | ||||
|         """ | ||||
|         if self.source_dir is None: | ||||
|             self.source_dir = self.ensure_build_location( | ||||
|                 parent_dir, | ||||
|                 autodelete=autodelete, | ||||
|                 parallel_builds=parallel_builds, | ||||
|             ) | ||||
| 
 | ||||
|     def needs_unpacked_archive(self, archive_source: Path) -> None: | ||||
|         assert self._archive_source is None | ||||
|         self._archive_source = archive_source | ||||
| 
 | ||||
|     def ensure_pristine_source_checkout(self) -> None: | ||||
|         """Ensure the source directory has not yet been built in.""" | ||||
|         assert self.source_dir is not None | ||||
|         if self._archive_source is not None: | ||||
|             unpack_file(str(self._archive_source), self.source_dir) | ||||
|         elif is_installable_dir(self.source_dir): | ||||
|             # If a checkout exists, it's unwise to keep going. | ||||
|             # version inconsistencies are logged later, but do not fail | ||||
|             # the installation. | ||||
|             raise PreviousBuildDirError( | ||||
|                 f"pip can't proceed with requirements '{self}' due to a " | ||||
|                 f"pre-existing build directory ({self.source_dir}). This is likely " | ||||
|                 "due to a previous installation that failed . pip is " | ||||
|                 "being responsible and not assuming it can delete this. " | ||||
|                 "Please delete it and try again." | ||||
|             ) | ||||
| 
 | ||||
|     # For editable installations | ||||
|     def update_editable(self) -> None: | ||||
|         if not self.link: | ||||
|             logger.debug( | ||||
|                 "Cannot update repository at %s; repository location is unknown", | ||||
|                 self.source_dir, | ||||
|             ) | ||||
|             return | ||||
|         assert self.editable | ||||
|         assert self.source_dir | ||||
|         if self.link.scheme == "file": | ||||
|             # Static paths don't get updated | ||||
|             return | ||||
|         vcs_backend = vcs.get_backend_for_scheme(self.link.scheme) | ||||
|         # Editable requirements are validated in Requirement constructors. | ||||
|         # So here, if it's neither a path nor a valid VCS URL, it's a bug. | ||||
|         assert vcs_backend, f"Unsupported VCS URL {self.link.url}" | ||||
|         hidden_url = hide_url(self.link.url) | ||||
|         vcs_backend.obtain(self.source_dir, url=hidden_url, verbosity=0) | ||||
| 
 | ||||
|     # Top-level Actions | ||||
|     def uninstall( | ||||
|         self, auto_confirm: bool = False, verbose: bool = False | ||||
|     ) -> Optional[UninstallPathSet]: | ||||
|         """ | ||||
|         Uninstall the distribution currently satisfying this requirement. | ||||
| 
 | ||||
|         Prompts before removing or modifying files unless | ||||
|         ``auto_confirm`` is True. | ||||
| 
 | ||||
|         Refuses to delete or modify files outside of ``sys.prefix`` - | ||||
|         thus uninstallation within a virtual environment can only | ||||
|         modify that virtual environment, even if the virtualenv is | ||||
|         linked to global site-packages. | ||||
| 
 | ||||
|         """ | ||||
|         assert self.req | ||||
|         dist = get_default_environment().get_distribution(self.req.name) | ||||
|         if not dist: | ||||
|             logger.warning("Skipping %s as it is not installed.", self.name) | ||||
|             return None | ||||
|         logger.info("Found existing installation: %s", dist) | ||||
| 
 | ||||
|         uninstalled_pathset = UninstallPathSet.from_dist(dist) | ||||
|         uninstalled_pathset.remove(auto_confirm, verbose) | ||||
|         return uninstalled_pathset | ||||
| 
 | ||||
|     def _get_archive_name(self, path: str, parentdir: str, rootdir: str) -> str: | ||||
|         def _clean_zip_name(name: str, prefix: str) -> str: | ||||
|             assert name.startswith( | ||||
|                 prefix + os.path.sep | ||||
|             ), f"name {name!r} doesn't start with prefix {prefix!r}" | ||||
|             name = name[len(prefix) + 1 :] | ||||
|             name = name.replace(os.path.sep, "/") | ||||
|             return name | ||||
| 
 | ||||
|         assert self.req is not None | ||||
|         path = os.path.join(parentdir, path) | ||||
|         name = _clean_zip_name(path, rootdir) | ||||
|         return self.req.name + "/" + name | ||||
| 
 | ||||
|     def archive(self, build_dir: Optional[str]) -> None: | ||||
|         """Saves archive to provided build_dir. | ||||
| 
 | ||||
|         Used for saving downloaded VCS requirements as part of `pip download`. | ||||
|         """ | ||||
|         assert self.source_dir | ||||
|         if build_dir is None: | ||||
|             return | ||||
| 
 | ||||
|         create_archive = True | ||||
|         archive_name = "{}-{}.zip".format(self.name, self.metadata["version"]) | ||||
|         archive_path = os.path.join(build_dir, archive_name) | ||||
| 
 | ||||
|         if os.path.exists(archive_path): | ||||
|             response = ask_path_exists( | ||||
|                 f"The file {display_path(archive_path)} exists. (i)gnore, (w)ipe, " | ||||
|                 "(b)ackup, (a)bort ", | ||||
|                 ("i", "w", "b", "a"), | ||||
|             ) | ||||
|             if response == "i": | ||||
|                 create_archive = False | ||||
|             elif response == "w": | ||||
|                 logger.warning("Deleting %s", display_path(archive_path)) | ||||
|                 os.remove(archive_path) | ||||
|             elif response == "b": | ||||
|                 dest_file = backup_dir(archive_path) | ||||
|                 logger.warning( | ||||
|                     "Backing up %s to %s", | ||||
|                     display_path(archive_path), | ||||
|                     display_path(dest_file), | ||||
|                 ) | ||||
|                 shutil.move(archive_path, dest_file) | ||||
|             elif response == "a": | ||||
|                 sys.exit(-1) | ||||
| 
 | ||||
|         if not create_archive: | ||||
|             return | ||||
| 
 | ||||
|         zip_output = zipfile.ZipFile( | ||||
|             archive_path, | ||||
|             "w", | ||||
|             zipfile.ZIP_DEFLATED, | ||||
|             allowZip64=True, | ||||
|         ) | ||||
|         with zip_output: | ||||
|             dir = os.path.normcase(os.path.abspath(self.unpacked_source_directory)) | ||||
|             for dirpath, dirnames, filenames in os.walk(dir): | ||||
|                 for dirname in dirnames: | ||||
|                     dir_arcname = self._get_archive_name( | ||||
|                         dirname, | ||||
|                         parentdir=dirpath, | ||||
|                         rootdir=dir, | ||||
|                     ) | ||||
|                     zipdir = zipfile.ZipInfo(dir_arcname + "/") | ||||
|                     zipdir.external_attr = 0x1ED << 16  # 0o755 | ||||
|                     zip_output.writestr(zipdir, "") | ||||
|                 for filename in filenames: | ||||
|                     file_arcname = self._get_archive_name( | ||||
|                         filename, | ||||
|                         parentdir=dirpath, | ||||
|                         rootdir=dir, | ||||
|                     ) | ||||
|                     filename = os.path.join(dirpath, filename) | ||||
|                     zip_output.write(filename, file_arcname) | ||||
| 
 | ||||
|         logger.info("Saved %s", display_path(archive_path)) | ||||
| 
 | ||||
|     def install( | ||||
|         self, | ||||
|         global_options: Optional[Sequence[str]] = None, | ||||
|         root: Optional[str] = None, | ||||
|         home: Optional[str] = None, | ||||
|         prefix: Optional[str] = None, | ||||
|         warn_script_location: bool = True, | ||||
|         use_user_site: bool = False, | ||||
|         pycompile: bool = True, | ||||
|     ) -> None: | ||||
|         assert self.req is not None | ||||
|         scheme = get_scheme( | ||||
|             self.req.name, | ||||
|             user=use_user_site, | ||||
|             home=home, | ||||
|             root=root, | ||||
|             isolated=self.isolated, | ||||
|             prefix=prefix, | ||||
|         ) | ||||
| 
 | ||||
|         if self.editable and not self.is_wheel: | ||||
|             deprecated( | ||||
|                 reason=( | ||||
|                     f"Legacy editable install of {self} (setup.py develop) " | ||||
|                     "is deprecated." | ||||
|                 ), | ||||
|                 replacement=( | ||||
|                     "to add a pyproject.toml or enable --use-pep517, " | ||||
|                     "and use setuptools >= 64. " | ||||
|                     "If the resulting installation is not behaving as expected, " | ||||
|                     "try using --config-settings editable_mode=compat. " | ||||
|                     "Please consult the setuptools documentation for more information" | ||||
|                 ), | ||||
|                 gone_in="25.1", | ||||
|                 issue=11457, | ||||
|             ) | ||||
|             if self.config_settings: | ||||
|                 logger.warning( | ||||
|                     "--config-settings ignored for legacy editable install of %s. " | ||||
|                     "Consider upgrading to a version of setuptools " | ||||
|                     "that supports PEP 660 (>= 64).", | ||||
|                     self, | ||||
|                 ) | ||||
|             install_editable_legacy( | ||||
|                 global_options=global_options if global_options is not None else [], | ||||
|                 prefix=prefix, | ||||
|                 home=home, | ||||
|                 use_user_site=use_user_site, | ||||
|                 name=self.req.name, | ||||
|                 setup_py_path=self.setup_py_path, | ||||
|                 isolated=self.isolated, | ||||
|                 build_env=self.build_env, | ||||
|                 unpacked_source_directory=self.unpacked_source_directory, | ||||
|             ) | ||||
|             self.install_succeeded = True | ||||
|             return | ||||
| 
 | ||||
|         assert self.is_wheel | ||||
|         assert self.local_file_path | ||||
| 
 | ||||
|         install_wheel( | ||||
|             self.req.name, | ||||
|             self.local_file_path, | ||||
|             scheme=scheme, | ||||
|             req_description=str(self.req), | ||||
|             pycompile=pycompile, | ||||
|             warn_script_location=warn_script_location, | ||||
|             direct_url=self.download_info if self.is_direct else None, | ||||
|             requested=self.user_supplied, | ||||
|         ) | ||||
|         self.install_succeeded = True | ||||
| 
 | ||||
| 
 | ||||
| def check_invalid_constraint_type(req: InstallRequirement) -> str: | ||||
|     # Check for unsupported forms | ||||
|     problem = "" | ||||
|     if not req.name: | ||||
|         problem = "Unnamed requirements are not allowed as constraints" | ||||
|     elif req.editable: | ||||
|         problem = "Editable requirements are not allowed as constraints" | ||||
|     elif req.extras: | ||||
|         problem = "Constraints cannot have extras" | ||||
| 
 | ||||
|     if problem: | ||||
|         deprecated( | ||||
|             reason=( | ||||
|                 "Constraints are only allowed to take the form of a package " | ||||
|                 "name and a version specifier. Other forms were originally " | ||||
|                 "permitted as an accident of the implementation, but were " | ||||
|                 "undocumented. The new implementation of the resolver no " | ||||
|                 "longer supports these forms." | ||||
|             ), | ||||
|             replacement="replacing the constraint with a requirement", | ||||
|             # No plan yet for when the new resolver becomes default | ||||
|             gone_in=None, | ||||
|             issue=8210, | ||||
|         ) | ||||
| 
 | ||||
|     return problem | ||||
| 
 | ||||
| 
 | ||||
| def _has_option(options: Values, reqs: List[InstallRequirement], option: str) -> bool: | ||||
|     if getattr(options, option, None): | ||||
|         return True | ||||
|     for req in reqs: | ||||
|         if getattr(req, option, None): | ||||
|             return True | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| def check_legacy_setup_py_options( | ||||
|     options: Values, | ||||
|     reqs: List[InstallRequirement], | ||||
| ) -> None: | ||||
|     has_build_options = _has_option(options, reqs, "build_options") | ||||
|     has_global_options = _has_option(options, reqs, "global_options") | ||||
|     if has_build_options or has_global_options: | ||||
|         deprecated( | ||||
|             reason="--build-option and --global-option are deprecated.", | ||||
|             issue=11859, | ||||
|             replacement="to use --config-settings", | ||||
|             gone_in=None, | ||||
|         ) | ||||
|         logger.warning( | ||||
|             "Implying --no-binary=:all: due to the presence of " | ||||
|             "--build-option / --global-option. " | ||||
|         ) | ||||
|         options.format_control.disallow_binaries() | ||||
|  | @ -0,0 +1,82 @@ | |||
| import logging | ||||
| from collections import OrderedDict | ||||
| from typing import Dict, List | ||||
| 
 | ||||
| from pip._vendor.packaging.utils import canonicalize_name | ||||
| 
 | ||||
| from pip._internal.req.req_install import InstallRequirement | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class RequirementSet: | ||||
|     def __init__(self, check_supported_wheels: bool = True) -> None: | ||||
|         """Create a RequirementSet.""" | ||||
| 
 | ||||
|         self.requirements: Dict[str, InstallRequirement] = OrderedDict() | ||||
|         self.check_supported_wheels = check_supported_wheels | ||||
| 
 | ||||
|         self.unnamed_requirements: List[InstallRequirement] = [] | ||||
| 
 | ||||
|     def __str__(self) -> str: | ||||
|         requirements = sorted( | ||||
|             (req for req in self.requirements.values() if not req.comes_from), | ||||
|             key=lambda req: canonicalize_name(req.name or ""), | ||||
|         ) | ||||
|         return " ".join(str(req.req) for req in requirements) | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         requirements = sorted( | ||||
|             self.requirements.values(), | ||||
|             key=lambda req: canonicalize_name(req.name or ""), | ||||
|         ) | ||||
| 
 | ||||
|         format_string = "<{classname} object; {count} requirement(s): {reqs}>" | ||||
|         return format_string.format( | ||||
|             classname=self.__class__.__name__, | ||||
|             count=len(requirements), | ||||
|             reqs=", ".join(str(req.req) for req in requirements), | ||||
|         ) | ||||
| 
 | ||||
|     def add_unnamed_requirement(self, install_req: InstallRequirement) -> None: | ||||
|         assert not install_req.name | ||||
|         self.unnamed_requirements.append(install_req) | ||||
| 
 | ||||
|     def add_named_requirement(self, install_req: InstallRequirement) -> None: | ||||
|         assert install_req.name | ||||
| 
 | ||||
|         project_name = canonicalize_name(install_req.name) | ||||
|         self.requirements[project_name] = install_req | ||||
| 
 | ||||
|     def has_requirement(self, name: str) -> bool: | ||||
|         project_name = canonicalize_name(name) | ||||
| 
 | ||||
|         return ( | ||||
|             project_name in self.requirements | ||||
|             and not self.requirements[project_name].constraint | ||||
|         ) | ||||
| 
 | ||||
|     def get_requirement(self, name: str) -> InstallRequirement: | ||||
|         project_name = canonicalize_name(name) | ||||
| 
 | ||||
|         if project_name in self.requirements: | ||||
|             return self.requirements[project_name] | ||||
| 
 | ||||
|         raise KeyError(f"No project with the name {name!r}") | ||||
| 
 | ||||
|     @property | ||||
|     def all_requirements(self) -> List[InstallRequirement]: | ||||
|         return self.unnamed_requirements + list(self.requirements.values()) | ||||
| 
 | ||||
|     @property | ||||
|     def requirements_to_install(self) -> List[InstallRequirement]: | ||||
|         """Return the list of requirements that need to be installed. | ||||
| 
 | ||||
|         TODO remove this property together with the legacy resolver, since the new | ||||
|              resolver only returns requirements that need to be installed. | ||||
|         """ | ||||
|         return [ | ||||
|             install_req | ||||
|             for install_req in self.all_requirements | ||||
|             if not install_req.constraint and not install_req.satisfied_by | ||||
|         ] | ||||
|  | @ -0,0 +1,633 @@ | |||
| import functools | ||||
| import os | ||||
| import sys | ||||
| import sysconfig | ||||
| from importlib.util import cache_from_source | ||||
| from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple | ||||
| 
 | ||||
| from pip._internal.exceptions import LegacyDistutilsInstall, UninstallMissingRecord | ||||
| from pip._internal.locations import get_bin_prefix, get_bin_user | ||||
| from pip._internal.metadata import BaseDistribution | ||||
| from pip._internal.utils.compat import WINDOWS | ||||
| from pip._internal.utils.egg_link import egg_link_path_from_location | ||||
| from pip._internal.utils.logging import getLogger, indent_log | ||||
| from pip._internal.utils.misc import ask, normalize_path, renames, rmtree | ||||
| from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory | ||||
| from pip._internal.utils.virtualenv import running_under_virtualenv | ||||
| 
 | ||||
| logger = getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def _script_names( | ||||
|     bin_dir: str, script_name: str, is_gui: bool | ||||
| ) -> Generator[str, None, None]: | ||||
|     """Create the fully qualified name of the files created by | ||||
|     {console,gui}_scripts for the given ``dist``. | ||||
|     Returns the list of file names | ||||
|     """ | ||||
|     exe_name = os.path.join(bin_dir, script_name) | ||||
|     yield exe_name | ||||
|     if not WINDOWS: | ||||
|         return | ||||
|     yield f"{exe_name}.exe" | ||||
|     yield f"{exe_name}.exe.manifest" | ||||
|     if is_gui: | ||||
|         yield f"{exe_name}-script.pyw" | ||||
|     else: | ||||
|         yield f"{exe_name}-script.py" | ||||
| 
 | ||||
| 
 | ||||
| def _unique( | ||||
|     fn: Callable[..., Generator[Any, None, None]] | ||||
| ) -> Callable[..., Generator[Any, None, None]]: | ||||
|     @functools.wraps(fn) | ||||
|     def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: | ||||
|         seen: Set[Any] = set() | ||||
|         for item in fn(*args, **kw): | ||||
|             if item not in seen: | ||||
|                 seen.add(item) | ||||
|                 yield item | ||||
| 
 | ||||
|     return unique | ||||
| 
 | ||||
| 
 | ||||
| @_unique | ||||
| def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: | ||||
|     """ | ||||
|     Yield all the uninstallation paths for dist based on RECORD-without-.py[co] | ||||
| 
 | ||||
|     Yield paths to all the files in RECORD. For each .py file in RECORD, add | ||||
|     the .pyc and .pyo in the same directory. | ||||
| 
 | ||||
|     UninstallPathSet.add() takes care of the __pycache__ .py[co]. | ||||
| 
 | ||||
|     If RECORD is not found, raises an error, | ||||
|     with possible information from the INSTALLER file. | ||||
| 
 | ||||
|     https://packaging.python.org/specifications/recording-installed-packages/ | ||||
|     """ | ||||
|     location = dist.location | ||||
|     assert location is not None, "not installed" | ||||
| 
 | ||||
|     entries = dist.iter_declared_entries() | ||||
|     if entries is None: | ||||
|         raise UninstallMissingRecord(distribution=dist) | ||||
| 
 | ||||
|     for entry in entries: | ||||
|         path = os.path.join(location, entry) | ||||
|         yield path | ||||
|         if path.endswith(".py"): | ||||
|             dn, fn = os.path.split(path) | ||||
|             base = fn[:-3] | ||||
|             path = os.path.join(dn, base + ".pyc") | ||||
|             yield path | ||||
|             path = os.path.join(dn, base + ".pyo") | ||||
|             yield path | ||||
| 
 | ||||
| 
 | ||||
| def compact(paths: Iterable[str]) -> Set[str]: | ||||
|     """Compact a path set to contain the minimal number of paths | ||||
|     necessary to contain all paths in the set. If /a/path/ and | ||||
|     /a/path/to/a/file.txt are both in the set, leave only the | ||||
|     shorter path.""" | ||||
| 
 | ||||
|     sep = os.path.sep | ||||
|     short_paths: Set[str] = set() | ||||
|     for path in sorted(paths, key=len): | ||||
|         should_skip = any( | ||||
|             path.startswith(shortpath.rstrip("*")) | ||||
|             and path[len(shortpath.rstrip("*").rstrip(sep))] == sep | ||||
|             for shortpath in short_paths | ||||
|         ) | ||||
|         if not should_skip: | ||||
|             short_paths.add(path) | ||||
|     return short_paths | ||||
| 
 | ||||
| 
 | ||||
| def compress_for_rename(paths: Iterable[str]) -> Set[str]: | ||||
|     """Returns a set containing the paths that need to be renamed. | ||||
| 
 | ||||
|     This set may include directories when the original sequence of paths | ||||
|     included every file on disk. | ||||
|     """ | ||||
|     case_map = {os.path.normcase(p): p for p in paths} | ||||
|     remaining = set(case_map) | ||||
|     unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) | ||||
|     wildcards: Set[str] = set() | ||||
| 
 | ||||
|     def norm_join(*a: str) -> str: | ||||
|         return os.path.normcase(os.path.join(*a)) | ||||
| 
 | ||||
|     for root in unchecked: | ||||
|         if any(os.path.normcase(root).startswith(w) for w in wildcards): | ||||
|             # This directory has already been handled. | ||||
|             continue | ||||
| 
 | ||||
|         all_files: Set[str] = set() | ||||
|         all_subdirs: Set[str] = set() | ||||
|         for dirname, subdirs, files in os.walk(root): | ||||
|             all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) | ||||
|             all_files.update(norm_join(root, dirname, f) for f in files) | ||||
|         # If all the files we found are in our remaining set of files to | ||||
|         # remove, then remove them from the latter set and add a wildcard | ||||
|         # for the directory. | ||||
|         if not (all_files - remaining): | ||||
|             remaining.difference_update(all_files) | ||||
|             wildcards.add(root + os.sep) | ||||
| 
 | ||||
|     return set(map(case_map.__getitem__, remaining)) | wildcards | ||||
| 
 | ||||
| 
 | ||||
| def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]: | ||||
|     """Returns a tuple of 2 sets of which paths to display to user | ||||
| 
 | ||||
|     The first set contains paths that would be deleted. Files of a package | ||||
|     are not added and the top-level directory of the package has a '*' added | ||||
|     at the end - to signify that all it's contents are removed. | ||||
| 
 | ||||
|     The second set contains files that would have been skipped in the above | ||||
|     folders. | ||||
|     """ | ||||
| 
 | ||||
|     will_remove = set(paths) | ||||
|     will_skip = set() | ||||
| 
 | ||||
|     # Determine folders and files | ||||
|     folders = set() | ||||
|     files = set() | ||||
|     for path in will_remove: | ||||
|         if path.endswith(".pyc"): | ||||
|             continue | ||||
|         if path.endswith("__init__.py") or ".dist-info" in path: | ||||
|             folders.add(os.path.dirname(path)) | ||||
|         files.add(path) | ||||
| 
 | ||||
|     _normcased_files = set(map(os.path.normcase, files)) | ||||
| 
 | ||||
|     folders = compact(folders) | ||||
| 
 | ||||
|     # This walks the tree using os.walk to not miss extra folders | ||||
|     # that might get added. | ||||
|     for folder in folders: | ||||
|         for dirpath, _, dirfiles in os.walk(folder): | ||||
|             for fname in dirfiles: | ||||
|                 if fname.endswith(".pyc"): | ||||
|                     continue | ||||
| 
 | ||||
|                 file_ = os.path.join(dirpath, fname) | ||||
|                 if ( | ||||
|                     os.path.isfile(file_) | ||||
|                     and os.path.normcase(file_) not in _normcased_files | ||||
|                 ): | ||||
|                     # We are skipping this file. Add it to the set. | ||||
|                     will_skip.add(file_) | ||||
| 
 | ||||
|     will_remove = files | {os.path.join(folder, "*") for folder in folders} | ||||
| 
 | ||||
|     return will_remove, will_skip | ||||
| 
 | ||||
| 
 | ||||
| class StashedUninstallPathSet: | ||||
|     """A set of file rename operations to stash files while | ||||
|     tentatively uninstalling them.""" | ||||
| 
 | ||||
|     def __init__(self) -> None: | ||||
|         # Mapping from source file root to [Adjacent]TempDirectory | ||||
|         # for files under that directory. | ||||
|         self._save_dirs: Dict[str, TempDirectory] = {} | ||||
|         # (old path, new path) tuples for each move that may need | ||||
|         # to be undone. | ||||
|         self._moves: List[Tuple[str, str]] = [] | ||||
| 
 | ||||
|     def _get_directory_stash(self, path: str) -> str: | ||||
|         """Stashes a directory. | ||||
| 
 | ||||
|         Directories are stashed adjacent to their original location if | ||||
|         possible, or else moved/copied into the user's temp dir.""" | ||||
| 
 | ||||
|         try: | ||||
|             save_dir: TempDirectory = AdjacentTempDirectory(path) | ||||
|         except OSError: | ||||
|             save_dir = TempDirectory(kind="uninstall") | ||||
|         self._save_dirs[os.path.normcase(path)] = save_dir | ||||
| 
 | ||||
|         return save_dir.path | ||||
| 
 | ||||
|     def _get_file_stash(self, path: str) -> str: | ||||
|         """Stashes a file. | ||||
| 
 | ||||
|         If no root has been provided, one will be created for the directory | ||||
|         in the user's temp directory.""" | ||||
|         path = os.path.normcase(path) | ||||
|         head, old_head = os.path.dirname(path), None | ||||
|         save_dir = None | ||||
| 
 | ||||
|         while head != old_head: | ||||
|             try: | ||||
|                 save_dir = self._save_dirs[head] | ||||
|                 break | ||||
|             except KeyError: | ||||
|                 pass | ||||
|             head, old_head = os.path.dirname(head), head | ||||
|         else: | ||||
|             # Did not find any suitable root | ||||
|             head = os.path.dirname(path) | ||||
|             save_dir = TempDirectory(kind="uninstall") | ||||
|             self._save_dirs[head] = save_dir | ||||
| 
 | ||||
|         relpath = os.path.relpath(path, head) | ||||
|         if relpath and relpath != os.path.curdir: | ||||
|             return os.path.join(save_dir.path, relpath) | ||||
|         return save_dir.path | ||||
| 
 | ||||
|     def stash(self, path: str) -> str: | ||||
|         """Stashes the directory or file and returns its new location. | ||||
|         Handle symlinks as files to avoid modifying the symlink targets. | ||||
|         """ | ||||
|         path_is_dir = os.path.isdir(path) and not os.path.islink(path) | ||||
|         if path_is_dir: | ||||
|             new_path = self._get_directory_stash(path) | ||||
|         else: | ||||
|             new_path = self._get_file_stash(path) | ||||
| 
 | ||||
|         self._moves.append((path, new_path)) | ||||
|         if path_is_dir and os.path.isdir(new_path): | ||||
|             # If we're moving a directory, we need to | ||||
|             # remove the destination first or else it will be | ||||
|             # moved to inside the existing directory. | ||||
|             # We just created new_path ourselves, so it will | ||||
|             # be removable. | ||||
|             os.rmdir(new_path) | ||||
|         renames(path, new_path) | ||||
|         return new_path | ||||
| 
 | ||||
|     def commit(self) -> None: | ||||
|         """Commits the uninstall by removing stashed files.""" | ||||
|         for save_dir in self._save_dirs.values(): | ||||
|             save_dir.cleanup() | ||||
|         self._moves = [] | ||||
|         self._save_dirs = {} | ||||
| 
 | ||||
|     def rollback(self) -> None: | ||||
|         """Undoes the uninstall by moving stashed files back.""" | ||||
|         for p in self._moves: | ||||
|             logger.info("Moving to %s\n from %s", *p) | ||||
| 
 | ||||
|         for new_path, path in self._moves: | ||||
|             try: | ||||
|                 logger.debug("Replacing %s from %s", new_path, path) | ||||
|                 if os.path.isfile(new_path) or os.path.islink(new_path): | ||||
|                     os.unlink(new_path) | ||||
|                 elif os.path.isdir(new_path): | ||||
|                     rmtree(new_path) | ||||
|                 renames(path, new_path) | ||||
|             except OSError as ex: | ||||
|                 logger.error("Failed to restore %s", new_path) | ||||
|                 logger.debug("Exception: %s", ex) | ||||
| 
 | ||||
|         self.commit() | ||||
| 
 | ||||
|     @property | ||||
|     def can_rollback(self) -> bool: | ||||
|         return bool(self._moves) | ||||
| 
 | ||||
| 
 | ||||
| class UninstallPathSet: | ||||
|     """A set of file paths to be removed in the uninstallation of a | ||||
|     requirement.""" | ||||
| 
 | ||||
|     def __init__(self, dist: BaseDistribution) -> None: | ||||
|         self._paths: Set[str] = set() | ||||
|         self._refuse: Set[str] = set() | ||||
|         self._pth: Dict[str, UninstallPthEntries] = {} | ||||
|         self._dist = dist | ||||
|         self._moved_paths = StashedUninstallPathSet() | ||||
|         # Create local cache of normalize_path results. Creating an UninstallPathSet | ||||
|         # can result in hundreds/thousands of redundant calls to normalize_path with | ||||
|         # the same args, which hurts performance. | ||||
|         self._normalize_path_cached = functools.lru_cache(normalize_path) | ||||
| 
 | ||||
|     def _permitted(self, path: str) -> bool: | ||||
|         """ | ||||
|         Return True if the given path is one we are permitted to | ||||
|         remove/modify, False otherwise. | ||||
| 
 | ||||
|         """ | ||||
|         # aka is_local, but caching normalized sys.prefix | ||||
|         if not running_under_virtualenv(): | ||||
|             return True | ||||
|         return path.startswith(self._normalize_path_cached(sys.prefix)) | ||||
| 
 | ||||
|     def add(self, path: str) -> None: | ||||
|         head, tail = os.path.split(path) | ||||
| 
 | ||||
|         # we normalize the head to resolve parent directory symlinks, but not | ||||
|         # the tail, since we only want to uninstall symlinks, not their targets | ||||
|         path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) | ||||
| 
 | ||||
|         if not os.path.exists(path): | ||||
|             return | ||||
|         if self._permitted(path): | ||||
|             self._paths.add(path) | ||||
|         else: | ||||
|             self._refuse.add(path) | ||||
| 
 | ||||
|         # __pycache__ files can show up after 'installed-files.txt' is created, | ||||
|         # due to imports | ||||
|         if os.path.splitext(path)[1] == ".py": | ||||
|             self.add(cache_from_source(path)) | ||||
| 
 | ||||
|     def add_pth(self, pth_file: str, entry: str) -> None: | ||||
|         pth_file = self._normalize_path_cached(pth_file) | ||||
|         if self._permitted(pth_file): | ||||
|             if pth_file not in self._pth: | ||||
|                 self._pth[pth_file] = UninstallPthEntries(pth_file) | ||||
|             self._pth[pth_file].add(entry) | ||||
|         else: | ||||
|             self._refuse.add(pth_file) | ||||
| 
 | ||||
|     def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: | ||||
|         """Remove paths in ``self._paths`` with confirmation (unless | ||||
|         ``auto_confirm`` is True).""" | ||||
| 
 | ||||
|         if not self._paths: | ||||
|             logger.info( | ||||
|                 "Can't uninstall '%s'. No files were found to uninstall.", | ||||
|                 self._dist.raw_name, | ||||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         dist_name_version = f"{self._dist.raw_name}-{self._dist.raw_version}" | ||||
|         logger.info("Uninstalling %s:", dist_name_version) | ||||
| 
 | ||||
|         with indent_log(): | ||||
|             if auto_confirm or self._allowed_to_proceed(verbose): | ||||
|                 moved = self._moved_paths | ||||
| 
 | ||||
|                 for_rename = compress_for_rename(self._paths) | ||||
| 
 | ||||
|                 for path in sorted(compact(for_rename)): | ||||
|                     moved.stash(path) | ||||
|                     logger.verbose("Removing file or directory %s", path) | ||||
| 
 | ||||
|                 for pth in self._pth.values(): | ||||
|                     pth.remove() | ||||
| 
 | ||||
|                 logger.info("Successfully uninstalled %s", dist_name_version) | ||||
| 
 | ||||
|     def _allowed_to_proceed(self, verbose: bool) -> bool: | ||||
|         """Display which files would be deleted and prompt for confirmation""" | ||||
| 
 | ||||
|         def _display(msg: str, paths: Iterable[str]) -> None: | ||||
|             if not paths: | ||||
|                 return | ||||
| 
 | ||||
|             logger.info(msg) | ||||
|             with indent_log(): | ||||
|                 for path in sorted(compact(paths)): | ||||
|                     logger.info(path) | ||||
| 
 | ||||
|         if not verbose: | ||||
|             will_remove, will_skip = compress_for_output_listing(self._paths) | ||||
|         else: | ||||
|             # In verbose mode, display all the files that are going to be | ||||
|             # deleted. | ||||
|             will_remove = set(self._paths) | ||||
|             will_skip = set() | ||||
| 
 | ||||
|         _display("Would remove:", will_remove) | ||||
|         _display("Would not remove (might be manually added):", will_skip) | ||||
|         _display("Would not remove (outside of prefix):", self._refuse) | ||||
|         if verbose: | ||||
|             _display("Will actually move:", compress_for_rename(self._paths)) | ||||
| 
 | ||||
|         return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" | ||||
| 
 | ||||
|     def rollback(self) -> None: | ||||
|         """Rollback the changes previously made by remove().""" | ||||
|         if not self._moved_paths.can_rollback: | ||||
|             logger.error( | ||||
|                 "Can't roll back %s; was not uninstalled", | ||||
|                 self._dist.raw_name, | ||||
|             ) | ||||
|             return | ||||
|         logger.info("Rolling back uninstall of %s", self._dist.raw_name) | ||||
|         self._moved_paths.rollback() | ||||
|         for pth in self._pth.values(): | ||||
|             pth.rollback() | ||||
| 
 | ||||
|     def commit(self) -> None: | ||||
|         """Remove temporary save dir: rollback will no longer be possible.""" | ||||
|         self._moved_paths.commit() | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet": | ||||
|         dist_location = dist.location | ||||
|         info_location = dist.info_location | ||||
|         if dist_location is None: | ||||
|             logger.info( | ||||
|                 "Not uninstalling %s since it is not installed", | ||||
|                 dist.canonical_name, | ||||
|             ) | ||||
|             return cls(dist) | ||||
| 
 | ||||
|         normalized_dist_location = normalize_path(dist_location) | ||||
|         if not dist.local: | ||||
|             logger.info( | ||||
|                 "Not uninstalling %s at %s, outside environment %s", | ||||
|                 dist.canonical_name, | ||||
|                 normalized_dist_location, | ||||
|                 sys.prefix, | ||||
|             ) | ||||
|             return cls(dist) | ||||
| 
 | ||||
|         if normalized_dist_location in { | ||||
|             p | ||||
|             for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} | ||||
|             if p | ||||
|         }: | ||||
|             logger.info( | ||||
|                 "Not uninstalling %s at %s, as it is in the standard library.", | ||||
|                 dist.canonical_name, | ||||
|                 normalized_dist_location, | ||||
|             ) | ||||
|             return cls(dist) | ||||
| 
 | ||||
|         paths_to_remove = cls(dist) | ||||
|         develop_egg_link = egg_link_path_from_location(dist.raw_name) | ||||
| 
 | ||||
|         # Distribution is installed with metadata in a "flat" .egg-info | ||||
|         # directory. This means it is not a modern .dist-info installation, an | ||||
|         # egg, or legacy editable. | ||||
|         setuptools_flat_installation = ( | ||||
|             dist.installed_with_setuptools_egg_info | ||||
|             and info_location is not None | ||||
|             and os.path.exists(info_location) | ||||
|             # If dist is editable and the location points to a ``.egg-info``, | ||||
|             # we are in fact in the legacy editable case. | ||||
|             and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") | ||||
|         ) | ||||
| 
 | ||||
|         # Uninstall cases order do matter as in the case of 2 installs of the | ||||
|         # same package, pip needs to uninstall the currently detected version | ||||
|         if setuptools_flat_installation: | ||||
|             if info_location is not None: | ||||
|                 paths_to_remove.add(info_location) | ||||
|             installed_files = dist.iter_declared_entries() | ||||
|             if installed_files is not None: | ||||
|                 for installed_file in installed_files: | ||||
|                     paths_to_remove.add(os.path.join(dist_location, installed_file)) | ||||
|             # FIXME: need a test for this elif block | ||||
|             # occurs with --single-version-externally-managed/--record outside | ||||
|             # of pip | ||||
|             elif dist.is_file("top_level.txt"): | ||||
|                 try: | ||||
|                     namespace_packages = dist.read_text("namespace_packages.txt") | ||||
|                 except FileNotFoundError: | ||||
|                     namespaces = [] | ||||
|                 else: | ||||
|                     namespaces = namespace_packages.splitlines(keepends=False) | ||||
|                 for top_level_pkg in [ | ||||
|                     p | ||||
|                     for p in dist.read_text("top_level.txt").splitlines() | ||||
|                     if p and p not in namespaces | ||||
|                 ]: | ||||
|                     path = os.path.join(dist_location, top_level_pkg) | ||||
|                     paths_to_remove.add(path) | ||||
|                     paths_to_remove.add(f"{path}.py") | ||||
|                     paths_to_remove.add(f"{path}.pyc") | ||||
|                     paths_to_remove.add(f"{path}.pyo") | ||||
| 
 | ||||
|         elif dist.installed_by_distutils: | ||||
|             raise LegacyDistutilsInstall(distribution=dist) | ||||
| 
 | ||||
|         elif dist.installed_as_egg: | ||||
|             # package installed by easy_install | ||||
|             # We cannot match on dist.egg_name because it can slightly vary | ||||
|             # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg | ||||
|             paths_to_remove.add(dist_location) | ||||
|             easy_install_egg = os.path.split(dist_location)[1] | ||||
|             easy_install_pth = os.path.join( | ||||
|                 os.path.dirname(dist_location), | ||||
|                 "easy-install.pth", | ||||
|             ) | ||||
|             paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) | ||||
| 
 | ||||
|         elif dist.installed_with_dist_info: | ||||
|             for path in uninstallation_paths(dist): | ||||
|                 paths_to_remove.add(path) | ||||
| 
 | ||||
|         elif develop_egg_link: | ||||
|             # PEP 660 modern editable is handled in the ``.dist-info`` case | ||||
|             # above, so this only covers the setuptools-style editable. | ||||
|             with open(develop_egg_link) as fh: | ||||
|                 link_pointer = os.path.normcase(fh.readline().strip()) | ||||
|                 normalized_link_pointer = paths_to_remove._normalize_path_cached( | ||||
|                     link_pointer | ||||
|                 ) | ||||
|             assert os.path.samefile( | ||||
|                 normalized_link_pointer, normalized_dist_location | ||||
|             ), ( | ||||
|                 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " | ||||
|                 f"installed location of {dist.raw_name} (at {dist_location})" | ||||
|             ) | ||||
|             paths_to_remove.add(develop_egg_link) | ||||
|             easy_install_pth = os.path.join( | ||||
|                 os.path.dirname(develop_egg_link), "easy-install.pth" | ||||
|             ) | ||||
|             paths_to_remove.add_pth(easy_install_pth, dist_location) | ||||
| 
 | ||||
|         else: | ||||
|             logger.debug( | ||||
|                 "Not sure how to uninstall: %s - Check: %s", | ||||
|                 dist, | ||||
|                 dist_location, | ||||
|             ) | ||||
| 
 | ||||
|         if dist.in_usersite: | ||||
|             bin_dir = get_bin_user() | ||||
|         else: | ||||
|             bin_dir = get_bin_prefix() | ||||
| 
 | ||||
|         # find distutils scripts= scripts | ||||
|         try: | ||||
|             for script in dist.iter_distutils_script_names(): | ||||
|                 paths_to_remove.add(os.path.join(bin_dir, script)) | ||||
|                 if WINDOWS: | ||||
|                     paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) | ||||
|         except (FileNotFoundError, NotADirectoryError): | ||||
|             pass | ||||
| 
 | ||||
|         # find console_scripts and gui_scripts | ||||
|         def iter_scripts_to_remove( | ||||
|             dist: BaseDistribution, | ||||
|             bin_dir: str, | ||||
|         ) -> Generator[str, None, None]: | ||||
|             for entry_point in dist.iter_entry_points(): | ||||
|                 if entry_point.group == "console_scripts": | ||||
|                     yield from _script_names(bin_dir, entry_point.name, False) | ||||
|                 elif entry_point.group == "gui_scripts": | ||||
|                     yield from _script_names(bin_dir, entry_point.name, True) | ||||
| 
 | ||||
|         for s in iter_scripts_to_remove(dist, bin_dir): | ||||
|             paths_to_remove.add(s) | ||||
| 
 | ||||
|         return paths_to_remove | ||||
| 
 | ||||
| 
 | ||||
| class UninstallPthEntries: | ||||
|     def __init__(self, pth_file: str) -> None: | ||||
|         self.file = pth_file | ||||
|         self.entries: Set[str] = set() | ||||
|         self._saved_lines: Optional[List[bytes]] = None | ||||
| 
 | ||||
|     def add(self, entry: str) -> None: | ||||
|         entry = os.path.normcase(entry) | ||||
|         # On Windows, os.path.normcase converts the entry to use | ||||
|         # backslashes.  This is correct for entries that describe absolute | ||||
|         # paths outside of site-packages, but all the others use forward | ||||
|         # slashes. | ||||
|         # os.path.splitdrive is used instead of os.path.isabs because isabs | ||||
|         # treats non-absolute paths with drive letter markings like c:foo\bar | ||||
|         # as absolute paths. It also does not recognize UNC paths if they don't | ||||
|         # have more than "\\sever\share". Valid examples: "\\server\share\" or | ||||
|         # "\\server\share\folder". | ||||
|         if WINDOWS and not os.path.splitdrive(entry)[0]: | ||||
|             entry = entry.replace("\\", "/") | ||||
|         self.entries.add(entry) | ||||
| 
 | ||||
|     def remove(self) -> None: | ||||
|         logger.verbose("Removing pth entries from %s:", self.file) | ||||
| 
 | ||||
|         # If the file doesn't exist, log a warning and return | ||||
|         if not os.path.isfile(self.file): | ||||
|             logger.warning("Cannot remove entries from nonexistent file %s", self.file) | ||||
|             return | ||||
|         with open(self.file, "rb") as fh: | ||||
|             # windows uses '\r\n' with py3k, but uses '\n' with py2.x | ||||
|             lines = fh.readlines() | ||||
|             self._saved_lines = lines | ||||
|         if any(b"\r\n" in line for line in lines): | ||||
|             endline = "\r\n" | ||||
|         else: | ||||
|             endline = "\n" | ||||
|         # handle missing trailing newline | ||||
|         if lines and not lines[-1].endswith(endline.encode("utf-8")): | ||||
|             lines[-1] = lines[-1] + endline.encode("utf-8") | ||||
|         for entry in self.entries: | ||||
|             try: | ||||
|                 logger.verbose("Removing entry: %s", entry) | ||||
|                 lines.remove((entry + endline).encode("utf-8")) | ||||
|             except ValueError: | ||||
|                 pass | ||||
|         with open(self.file, "wb") as fh: | ||||
|             fh.writelines(lines) | ||||
| 
 | ||||
|     def rollback(self) -> bool: | ||||
|         if self._saved_lines is None: | ||||
|             logger.error("Cannot roll back changes to %s, none were made", self.file) | ||||
|             return False | ||||
|         logger.debug("Rolling %s back to previous state", self.file) | ||||
|         with open(self.file, "wb") as fh: | ||||
|             fh.writelines(self._saved_lines) | ||||
|         return True | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Tykayn
						Tykayn