mirror of
https://github.com/YunoHost-Apps/ffsync_ynh.git
synced 2024-09-03 18:26:38 +02:00
174 lines
6.5 KiB
Python
174 lines
6.5 KiB
Python
import functools
|
|
import logging
|
|
|
|
from pip._vendor import six
|
|
from pip._vendor.packaging.utils import canonicalize_name
|
|
from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
|
|
from pip._vendor.resolvelib import Resolver as RLResolver
|
|
|
|
from pip._internal.exceptions import InstallationError
|
|
from pip._internal.req.req_set import RequirementSet
|
|
from pip._internal.resolution.base import BaseResolver
|
|
from pip._internal.resolution.resolvelib.provider import PipProvider
|
|
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
|
|
|
from .factory import Factory
|
|
|
|
if MYPY_CHECK_RUNNING:
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from pip._vendor.resolvelib.resolvers import Result
|
|
|
|
from pip._internal.cache import WheelCache
|
|
from pip._internal.index.package_finder import PackageFinder
|
|
from pip._internal.operations.prepare import RequirementPreparer
|
|
from pip._internal.req.req_install import InstallRequirement
|
|
from pip._internal.resolution.base import InstallRequirementProvider
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Resolver(BaseResolver):
|
|
def __init__(
|
|
self,
|
|
preparer, # type: RequirementPreparer
|
|
finder, # type: PackageFinder
|
|
wheel_cache, # type: Optional[WheelCache]
|
|
make_install_req, # type: InstallRequirementProvider
|
|
use_user_site, # type: bool
|
|
ignore_dependencies, # type: bool
|
|
ignore_installed, # type: bool
|
|
ignore_requires_python, # type: bool
|
|
force_reinstall, # type: bool
|
|
upgrade_strategy, # type: str
|
|
py_version_info=None, # type: Optional[Tuple[int, ...]]
|
|
):
|
|
super(Resolver, self).__init__()
|
|
self.factory = Factory(
|
|
finder=finder,
|
|
preparer=preparer,
|
|
make_install_req=make_install_req,
|
|
force_reinstall=force_reinstall,
|
|
ignore_installed=ignore_installed,
|
|
ignore_requires_python=ignore_requires_python,
|
|
py_version_info=py_version_info,
|
|
)
|
|
self.ignore_dependencies = ignore_dependencies
|
|
self._result = None # type: Optional[Result]
|
|
|
|
def resolve(self, root_reqs, check_supported_wheels):
|
|
# type: (List[InstallRequirement], bool) -> RequirementSet
|
|
|
|
# FIXME: Implement constraints.
|
|
if any(r.constraint for r in root_reqs):
|
|
raise InstallationError("Constraints are not yet supported.")
|
|
|
|
provider = PipProvider(
|
|
factory=self.factory,
|
|
ignore_dependencies=self.ignore_dependencies,
|
|
)
|
|
reporter = BaseReporter()
|
|
resolver = RLResolver(provider, reporter)
|
|
|
|
requirements = [
|
|
self.factory.make_requirement_from_install_req(r)
|
|
for r in root_reqs
|
|
]
|
|
|
|
try:
|
|
self._result = resolver.resolve(requirements)
|
|
|
|
except ResolutionImpossible as e:
|
|
error = self.factory.get_installation_error(e)
|
|
if not error:
|
|
# TODO: This needs fixing, we need to look at the
|
|
# factory.get_installation_error infrastructure, as that
|
|
# doesn't really allow for the logger.critical calls I'm
|
|
# using here.
|
|
for req, parent in e.causes:
|
|
logger.critical(
|
|
"Could not find a version that satisfies " +
|
|
"the requirement " +
|
|
str(req) +
|
|
("" if parent is None else " (from {})".format(
|
|
parent.name
|
|
))
|
|
)
|
|
raise InstallationError(
|
|
"No matching distribution found for " +
|
|
", ".join([r.name for r, _ in e.causes])
|
|
)
|
|
raise
|
|
six.raise_from(error, e)
|
|
|
|
req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
|
|
for candidate in self._result.mapping.values():
|
|
ireq = provider.get_install_requirement(candidate)
|
|
if ireq is None:
|
|
continue
|
|
ireq.should_reinstall = self.factory.should_reinstall(candidate)
|
|
req_set.add_named_requirement(ireq)
|
|
|
|
return req_set
|
|
|
|
def get_installation_order(self, req_set):
|
|
# type: (RequirementSet) -> List[InstallRequirement]
|
|
"""Create a list that orders given requirements for installation.
|
|
|
|
The returned list should contain all requirements in ``req_set``,
|
|
so the caller can loop through it and have a requirement installed
|
|
before the requiring thing.
|
|
|
|
The current implementation walks the resolved dependency graph, and
|
|
make sure every node has a greater "weight" than all its parents.
|
|
"""
|
|
assert self._result is not None, "must call resolve() first"
|
|
weights = {} # type: Dict[Optional[str], int]
|
|
|
|
graph = self._result.graph
|
|
key_count = len(self._result.mapping) + 1 # Packages plus sentinal.
|
|
while len(weights) < key_count:
|
|
progressed = False
|
|
for key in graph:
|
|
if key in weights:
|
|
continue
|
|
parents = list(graph.iter_parents(key))
|
|
if not all(p in weights for p in parents):
|
|
continue
|
|
if parents:
|
|
weight = max(weights[p] for p in parents) + 1
|
|
else:
|
|
weight = 0
|
|
weights[key] = weight
|
|
progressed = True
|
|
|
|
# FIXME: This check will fail if there are unbreakable cycles.
|
|
# Implement something to forcifully break them up to continue.
|
|
if not progressed:
|
|
raise InstallationError(
|
|
"Could not determine installation order due to cicular "
|
|
"dependency."
|
|
)
|
|
|
|
sorted_items = sorted(
|
|
req_set.requirements.items(),
|
|
key=functools.partial(_req_set_item_sorter, weights=weights),
|
|
reverse=True,
|
|
)
|
|
return [ireq for _, ireq in sorted_items]
|
|
|
|
|
|
def _req_set_item_sorter(
|
|
item, # type: Tuple[str, InstallRequirement]
|
|
weights, # type: Dict[Optional[str], int]
|
|
):
|
|
# type: (...) -> Tuple[int, str]
|
|
"""Key function used to sort install requirements for installation.
|
|
|
|
Based on the "weight" mapping calculated in ``get_installation_order()``.
|
|
The canonical package name is returned as the second member as a tie-
|
|
breaker to ensure the result is predictable, which is useful in tests.
|
|
"""
|
|
name = canonicalize_name(item[0])
|
|
return weights[name], name
|