That whole thing about Specifier is completely overengineered, let's have a more simple check for version requirements...

This commit is contained in:
Alexandre Aubin 2020-05-03 19:46:21 +02:00
parent fc07468051
commit 9c0ccd0b4f
2 changed files with 35 additions and 343 deletions

2
debian/control vendored
View file

@ -14,7 +14,7 @@ Depends: ${python:Depends}, ${misc:Depends}
, moulinette (>= 3.7), ssowat (>= 3.7)
, python-psutil, python-requests, python-dnspython, python-openssl
, python-miniupnpc, python-dbus, python-jinja2
, python-toml
, python-toml, python-packaging
, apt, apt-transport-https
, nginx, nginx-extras (>=1.6.2)
, php-fpm, php-ldap, php-intl

View file

@ -23,347 +23,12 @@ import os
import logging
from moulinette.utils.process import check_output
from packaging import version
logger = logging.getLogger('yunohost.utils.packages')
YUNOHOST_PACKAGES = ['yunohost', 'yunohost-admin', 'moulinette', 'ssowat']
# Exceptions -----------------------------------------------------------------
class InvalidSpecifier(ValueError):
"""An invalid specifier was found."""
# Version specifier ----------------------------------------------------------
# The packaging package has been a nice inspiration for the following classes.
# See: https://github.com/pypa/packaging
class Specifier(object):
"""Unique package version specifier
Restrict a package version according to the `spec`. It must be a string
containing a relation from the list below followed by a version number
value. The relations allowed are, as defined by the Debian Policy Manual:
- `<<` for strictly lower
- `<=` for lower or equal
- `=` for exactly equal
- `>=` for greater or equal
- `>>` for strictly greater
"""
_regex_str = (
r"""
(?P<relation>(<<|<=|=|>=|>>))
\s*
(?P<version>[^,;\s)]*)
"""
)
_regex = re.compile(
r"^\s*" + _regex_str + r"\s*$", re.VERBOSE | re.IGNORECASE)
_relations = {
"<<": "lower_than",
"<=": "lower_or_equal_than",
"=": "equal",
">=": "greater_or_equal_than",
">>": "greater_than",
}
def __init__(self, spec):
if isinstance(spec, basestring):
match = self._regex.search(spec)
if not match:
raise InvalidSpecifier("Invalid specifier: '{0}'".format(spec))
self._spec = (
match.group("relation").strip(),
match.group("version").strip(),
)
elif isinstance(spec, self.__class__):
self._spec = spec._spec
else:
return NotImplemented
def __repr__(self):
return "<Specifier({0!r})>".format(str(self))
def __str__(self):
return "{0}{1}".format(*self._spec)
def __hash__(self):
return hash(self._spec)
def __eq__(self, other):
if isinstance(other, basestring):
try:
other = self.__class__(other)
except InvalidSpecifier:
return NotImplemented
elif not isinstance(other, self.__class__):
return NotImplemented
return self._spec == other._spec
def __ne__(self, other):
if isinstance(other, basestring):
try:
other = self.__class__(other)
except InvalidSpecifier:
return NotImplemented
elif not isinstance(other, self.__class__):
return NotImplemented
return self._spec != other._spec
def __and__(self, other):
return self.intersection(other)
def __or__(self, other):
return self.union(other)
def _get_relation(self, op):
return getattr(self, "_compare_{0}".format(self._relations[op]))
def _compare_lower_than(self, version, spec):
return version_compare(version, spec) < 0
def _compare_lower_or_equal_than(self, version, spec):
return version_compare(version, spec) <= 0
def _compare_equal(self, version, spec):
return version_compare(version, spec) == 0
def _compare_greater_or_equal_than(self, version, spec):
return version_compare(version, spec) >= 0
def _compare_greater_than(self, version, spec):
return version_compare(version, spec) > 0
@property
def relation(self):
return self._spec[0]
@property
def version(self):
return self._spec[1]
def __contains__(self, item):
return self.contains(item)
def intersection(self, other):
"""Make the intersection of two specifiers
Return a new `SpecifierSet` with version specifier(s) common to the
specifier and the other.
Example:
>>> Specifier('>= 2.2') & '>> 2.2.1' == '>> 2.2.1'
>>> Specifier('>= 2.2') & '<< 2.3' == '>= 2.2, << 2.3'
"""
if isinstance(other, basestring):
try:
other = self.__class__(other)
except InvalidSpecifier:
return NotImplemented
elif not isinstance(other, self.__class__):
return NotImplemented
# store spec parts for easy access
rel1, v1 = self.relation, self.version
rel2, v2 = other.relation, other.version
result = []
if other == self:
result = [other]
elif rel1 == '=':
result = [self] if v1 in other else None
elif rel2 == '=':
result = [other] if v2 in self else None
elif v1 == v2:
result = [other if rel1[1] == '=' else self]
elif v2 in self or v1 in other:
is_self_greater = version_compare(v1, v2) > 0
if rel1[0] == rel2[0]:
if rel1[0] == '>':
result = [self if is_self_greater else other]
else:
result = [other if is_self_greater else self]
else:
result = [self, other]
return SpecifierSet(result if result is not None else '')
def union(self, other):
"""Make the union of two version specifiers
Return a new `SpecifierSet` with version specifiers from the
specifier and the other.
Example:
>>> Specifier('>= 2.2') | '<< 2.3' == '>= 2.2, << 2.3'
"""
if isinstance(other, basestring):
try:
other = self.__class__(other)
except InvalidSpecifier:
return NotImplemented
elif not isinstance(other, self.__class__):
return NotImplemented
return SpecifierSet([self, other])
def contains(self, item):
"""Check if the specifier contains an other
Return whether the item is contained in the version specifier.
Example:
>>> '2.2.1' in Specifier('<< 2.3')
>>> '2.4' not in Specifier('<< 2.3')
"""
return self._get_relation(self.relation)(item, self.version)
class SpecifierSet(object):
"""A set of package version specifiers
Combine several Specifier separated by a comma. It allows to restrict
more precisely a package version. Each package version specifier must be
meet. Note than an empty set of specifiers will always be meet.
"""
def __init__(self, specifiers):
if isinstance(specifiers, basestring):
specifiers = [s.strip() for s in specifiers.split(",")
if s.strip()]
parsed = set()
for specifier in specifiers:
parsed.add(Specifier(specifier))
self._specs = frozenset(parsed)
def __repr__(self):
return "<SpecifierSet({0!r})>".format(str(self))
def __str__(self):
return ",".join(sorted(str(s) for s in self._specs))
def __hash__(self):
return hash(self._specs)
def __and__(self, other):
return self.intersection(other)
def __or__(self, other):
return self.union(other)
def __eq__(self, other):
if isinstance(other, basestring):
other = SpecifierSet(other)
elif isinstance(other, Specifier):
other = SpecifierSet(str(other))
elif not isinstance(other, SpecifierSet):
return NotImplemented
return self._specs == other._specs
def __ne__(self, other):
if isinstance(other, basestring):
other = SpecifierSet(other)
elif isinstance(other, Specifier):
other = SpecifierSet(str(other))
elif not isinstance(other, SpecifierSet):
return NotImplemented
return self._specs != other._specs
def __len__(self):
return len(self._specs)
def __iter__(self):
return iter(self._specs)
def __contains__(self, item):
return self.contains(item)
def intersection(self, other):
"""Make the intersection of two specifiers sets
Return a new `SpecifierSet` with version specifier(s) common to the
set and the other.
Example:
>>> SpecifierSet('>= 2.2') & '>> 2.2.1' == '>> 2.2.1'
>>> SpecifierSet('>= 2.2, << 2.4') & '<< 2.3' == '>= 2.2, << 2.3'
>>> SpecifierSet('>= 2.2, << 2.3') & '>= 2.4' == ''
"""
if isinstance(other, basestring):
other = SpecifierSet(other)
elif not isinstance(other, SpecifierSet):
return NotImplemented
specifiers = set(self._specs | other._specs)
intersection = [specifiers.pop()] if specifiers else []
for specifier in specifiers:
parsed = set()
for spec in intersection:
inter = spec & specifier
if not inter:
parsed.clear()
break
# TODO: validate with other specs in parsed
parsed.update(inter._specs)
intersection = parsed
if not intersection:
break
return SpecifierSet(intersection)
def union(self, other):
"""Make the union of two specifiers sets
Return a new `SpecifierSet` with version specifiers from the set
and the other.
Example:
>>> SpecifierSet('>= 2.2') | '<< 2.3' == '>= 2.2, << 2.3'
"""
if isinstance(other, basestring):
other = SpecifierSet(other)
elif not isinstance(other, SpecifierSet):
return NotImplemented
specifiers = SpecifierSet([])
specifiers._specs = frozenset(self._specs | other._specs)
return specifiers
def contains(self, item):
"""Check if the set contains a version specifier
Return whether the item is contained in all version specifiers.
Example:
>>> '2.2.1' in SpecifierSet('>= 2.2, << 2.3')
>>> '2.4' not in SpecifierSet('>= 2.2, << 2.3')
"""
return all(
s.contains(item)
for s in self._specs
)
# Packages and cache helpers -------------------------------------------------
def get_ynh_package_version(package):
@ -383,14 +48,39 @@ def get_ynh_package_version(package):
return {"version": out[1].strip("()"),
"repo": out[2].strip(";")}
def meets_version_specifier(pkgname, specifier):
"""Check if a package installed version meets specifier"""
# In practice, this function is only used to check the yunohost version installed
assert pkgname in YUNOHOST_PACKAGES
return get_ynh_package_version(pkgname) in SpecifierSet(specifier)
def meets_version_specifier(pkg_name, specifier):
"""
Check if a package installed version meets specifier
specifier is something like ">> 1.2.3"
"""
# In practice, this function is only used to check the yunohost version
# installed.
# We'll trim any ~foobar in the current installed version because it's not
# handled correctly by version.parse, but we don't care so much in that
# context
assert pkg_name in YUNOHOST_PACKAGES
pkg_version = get_ynh_package_version(pkg_name)["version"]
pkg_version = pkg_version.split("~")[0]
pkg_version = version.parse(pkg_version)
# Extract operator and version specifier
op, req_version = re.search(r'(<<|<=|=|>=|>>) *([\d\.]+)', specifier).groups()
req_version = version.parse(req_version)
# cmp is a python builtin that returns (-1, 0, 1) depending on comparison
deb_operators = {
"<<": lambda v1, v2: cmp(v1, v2) in [-1],
"<=": lambda v1, v2: cmp(v1, v2) in [-1, 0],
"=": lambda v1, v2: cmp(v1, v2) in [0],
">=": lambda v1, v2: cmp(v1, v2) in [0, 1],
">>": lambda v1, v2: cmp(v1, v2) in [1]
}
return deb_operators[op](pkg_version, req_version)
# YunoHost related methods ---------------------------------------------------
def ynh_packages_version(*args, **kwargs):
# from cli the received arguments are:
@ -413,9 +103,11 @@ def dpkg_is_broken():
return any(re.match("^[0-9]+$", f)
for f in os.listdir("/var/lib/dpkg/updates/"))
def dpkg_lock_available():
return os.system("lsof /var/lib/dpkg/lock >/dev/null") != 0
def _list_upgradable_apt_packages():
# List upgradable packages