Merge pull request #1183 from YunoHost/rework-authenticator-system

Rework the authenticator system, split the LDAP stuff into auth part and utils part
This commit is contained in:
Alexandre Aubin 2021-08-27 18:44:22 +02:00 committed by GitHub
commit c3955f7aed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 495 additions and 141 deletions

View file

@ -181,3 +181,12 @@ test-service:
only: only:
changes: changes:
- src/yunohost/service.py - src/yunohost/service.py
test-ldapauth:
extends: .test-stage
script:
- cd src/yunohost
- python3 -m pytest tests/test_ldapauth.py
only:
changes:
- src/yunohost/authenticators/*.py

View file

@ -33,18 +33,10 @@
# Global parameters # # Global parameters #
############################# #############################
_global: _global:
configuration: name: yunohost.admin
authenticate: authentication:
- api api: ldap_admin
authenticator: cli: null
default:
vendor: ldap
help: admin_password
parameters:
uri: ldap://localhost:389
base_dn: dc=yunohost,dc=org
user_rdn: cn=admin,dc=yunohost,dc=org
argument_auth: false
arguments: arguments:
-v: -v:
full: --version full: --version
@ -1402,9 +1394,9 @@ tools:
postinstall: postinstall:
action_help: YunoHost post-install action_help: YunoHost post-install
api: POST /postinstall api: POST /postinstall
configuration: authentication:
# We need to be able to run the postinstall without being authenticated, otherwise we can't run the postinstall # We need to be able to run the postinstall without being authenticated, otherwise we can't run the postinstall
authenticate: false api: null
arguments: arguments:
-d: -d:
full: --domain full: --domain

2
debian/control vendored
View file

@ -14,7 +14,7 @@ Depends: ${python3:Depends}, ${misc:Depends}
, python3-psutil, python3-requests, python3-dnspython, python3-openssl , python3-psutil, python3-requests, python3-dnspython, python3-openssl
, python3-miniupnpc, python3-dbus, python3-jinja2 , python3-miniupnpc, python3-dbus, python3-jinja2
, python3-toml, python3-packaging, python3-publicsuffix, , python3-toml, python3-packaging, python3-publicsuffix,
, python3-zeroconf, , python3-ldap, python3-zeroconf,
, apt, apt-transport-https, apt-utils, dirmngr , apt, apt-transport-https, apt-utils, dirmngr
, php7.3-common, php7.3-fpm, php7.3-ldap, php7.3-intl , php7.3-common, php7.3-fpm, php7.3-ldap, php7.3-intl
, mariadb-server, php7.3-mysql , mariadb-server, php7.3-mysql

View file

@ -372,6 +372,8 @@
"invalid_regex": "Invalid regex:'{regex}'", "invalid_regex": "Invalid regex:'{regex}'",
"ip6tables_unavailable": "You cannot play with ip6tables here. You are either in a container or your kernel does not support it", "ip6tables_unavailable": "You cannot play with ip6tables here. You are either in a container or your kernel does not support it",
"iptables_unavailable": "You cannot play with iptables here. You are either in a container or your kernel does not support it", "iptables_unavailable": "You cannot play with iptables here. You are either in a container or your kernel does not support it",
"ldap_server_down": "Unable to reach LDAP server",
"ldap_server_is_down_restart_it": "The LDAP service is down, attempt to restart it...",
"log_corrupted_md_file": "The YAML metadata file associated with logs is damaged: '{md_file}\nError: {error}'", "log_corrupted_md_file": "The YAML metadata file associated with logs is damaged: '{md_file}\nError: {error}'",
"log_link_to_log": "Full log of this operation: '<a href=\"#/tools/logs/{name}\" style=\"text-decoration:underline\">{desc}</a>'", "log_link_to_log": "Full log of this operation: '<a href=\"#/tools/logs/{name}\" style=\"text-decoration:underline\">{desc}</a>'",
"log_help_to_get_log": "To view the log of the operation '{desc}', use the command 'yunohost log show {name}{name}'", "log_help_to_get_log": "To view the log of the operation '{desc}', use the command 'yunohost log show {name}{name}'",
@ -479,6 +481,7 @@
"migrations_to_be_ran_manually": "Migration {id} has to be run manually. Please go to Tools → Migrations on the webadmin page, or run `yunohost tools migrations run`.", "migrations_to_be_ran_manually": "Migration {id} has to be run manually. Please go to Tools → Migrations on the webadmin page, or run `yunohost tools migrations run`.",
"not_enough_disk_space": "Not enough free space on '{path}'", "not_enough_disk_space": "Not enough free space on '{path}'",
"invalid_number": "Must be a number", "invalid_number": "Must be a number",
"invalid_password": "Invalid password",
"operation_interrupted": "The operation was manually interrupted?", "operation_interrupted": "The operation was manually interrupted?",
"packages_upgrade_failed": "Could not upgrade all the packages", "packages_upgrade_failed": "Could not upgrade all the packages",
"password_listed": "This password is among the most used passwords in the world. Please choose something more unique.", "password_listed": "This password is among the most used passwords in the world. Please choose something more unique.",

View file

@ -36,7 +36,7 @@ import urllib.parse
import tempfile import tempfile
from collections import OrderedDict from collections import OrderedDict
from moulinette import msignals, m18n, msettings from moulinette import Moulinette, m18n
from moulinette.core import MoulinetteError from moulinette.core import MoulinetteError
from moulinette.utils.log import getActionLogger from moulinette.utils.log import getActionLogger
from moulinette.utils.network import download_json from moulinette.utils.network import download_json
@ -649,7 +649,7 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
m18n.n("app_upgrade_failed", app=app_instance_name, error=error) m18n.n("app_upgrade_failed", app=app_instance_name, error=error)
) )
failure_message_with_debug_instructions = operation_logger.error(error) failure_message_with_debug_instructions = operation_logger.error(error)
if msettings.get("interface") != "api": if Moulinette.interface.type != "api":
dump_app_log_extract_for_debugging(operation_logger) dump_app_log_extract_for_debugging(operation_logger)
# Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception # Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError): except (KeyboardInterrupt, EOFError):
@ -827,11 +827,11 @@ def app_install(
def confirm_install(confirm): def confirm_install(confirm):
# Ignore if there's nothing for confirm (good quality app), if --force is used # Ignore if there's nothing for confirm (good quality app), if --force is used
# or if request on the API (confirm already implemented on the API side) # or if request on the API (confirm already implemented on the API side)
if confirm is None or force or msettings.get("interface") == "api": if confirm is None or force or Moulinette.interface.type == "api":
return return
if confirm in ["danger", "thirdparty"]: if confirm in ["danger", "thirdparty"]:
answer = msignals.prompt( answer = Moulinette.prompt(
m18n.n("confirm_app_install_" + confirm, answers="Yes, I understand"), m18n.n("confirm_app_install_" + confirm, answers="Yes, I understand"),
color="red", color="red",
) )
@ -839,7 +839,7 @@ def app_install(
raise YunohostError("aborting") raise YunohostError("aborting")
else: else:
answer = msignals.prompt( answer = Moulinette.prompt(
m18n.n("confirm_app_install_" + confirm, answers="Y/N"), color="yellow" m18n.n("confirm_app_install_" + confirm, answers="Y/N"), color="yellow"
) )
if answer.upper() != "Y": if answer.upper() != "Y":
@ -1015,7 +1015,7 @@ def app_install(
error = m18n.n("app_install_script_failed") error = m18n.n("app_install_script_failed")
logger.error(m18n.n("app_install_failed", app=app_id, error=error)) logger.error(m18n.n("app_install_failed", app=app_id, error=error))
failure_message_with_debug_instructions = operation_logger.error(error) failure_message_with_debug_instructions = operation_logger.error(error)
if msettings.get("interface") != "api": if Moulinette.interface.type != "api":
dump_app_log_extract_for_debugging(operation_logger) dump_app_log_extract_for_debugging(operation_logger)
# Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception # Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError): except (KeyboardInterrupt, EOFError):
@ -2741,7 +2741,7 @@ class YunoHostArgumentFormatParser(object):
) )
try: try:
question.value = msignals.prompt( question.value = Moulinette.prompt(
text_for_user_input_in_cli, self.hide_user_input_in_prompt text_for_user_input_in_cli, self.hide_user_input_in_prompt
) )
except NotImplementedError: except NotImplementedError:

View file

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
import os
import logging
import ldap
import ldap.sasl
import time
from moulinette import m18n
from moulinette.authentication import BaseAuthenticator
from yunohost.utils.error import YunohostError
logger = logging.getLogger("yunohost.authenticators.ldap_admin")
class Authenticator(BaseAuthenticator):
name = "ldap_admin"
def __init__(self, *args, **kwargs):
self.uri = "ldap://localhost:389"
self.basedn = "dc=yunohost,dc=org"
self.admindn = "cn=admin,dc=yunohost,dc=org"
def _authenticate_credentials(self, credentials=None):
# TODO : change authentication format
# to support another dn to support multi-admins
def _reconnect():
con = ldap.ldapobject.ReconnectLDAPObject(
self.uri, retry_max=10, retry_delay=0.5
)
con.simple_bind_s(self.admindn, credentials)
return con
try:
con = _reconnect()
except ldap.INVALID_CREDENTIALS:
raise YunohostError("invalid_password")
except ldap.SERVER_DOWN:
# ldap is down, attempt to restart it before really failing
logger.warning(m18n.n("ldap_server_is_down_restart_it"))
os.system("systemctl restart slapd")
time.sleep(10) # waits 10 secondes so we are sure that slapd has restarted
try:
con = _reconnect()
except ldap.SERVER_DOWN:
raise YunohostError("ldap_server_down")
# Check that we are indeed logged in with the expected identity
try:
# whoami_s return dn:..., then delete these 3 characters
who = con.whoami_s()[3:]
except Exception as e:
logger.warning("Error during ldap authentication process: %s", e)
raise
else:
if who != self.admindn:
raise YunohostError(f"Not logged with the appropriate identity ? Found {who}, expected {self.admindn} !?", raw_msg=True)
finally:
# Free the connection, we don't really need it to keep it open as the point is only to check authentication...
if con:
con.unbind_s()

View file

@ -38,7 +38,7 @@ from collections import OrderedDict
from functools import reduce from functools import reduce
from packaging import version from packaging import version
from moulinette import msignals, m18n, msettings from moulinette import Moulinette, m18n
from moulinette.utils import filesystem from moulinette.utils import filesystem
from moulinette.utils.log import getActionLogger from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import read_file, mkdir, write_to_yaml, read_yaml from moulinette.utils.filesystem import read_file, mkdir, write_to_yaml, read_yaml
@ -1509,7 +1509,7 @@ class RestoreManager:
m18n.n("app_restore_failed", app=app_instance_name, error=error) m18n.n("app_restore_failed", app=app_instance_name, error=error)
) )
failure_message_with_debug_instructions = operation_logger.error(error) failure_message_with_debug_instructions = operation_logger.error(error)
if msettings.get("interface") != "api": if Moulinette.interface.type != "api":
dump_app_log_extract_for_debugging(operation_logger) dump_app_log_extract_for_debugging(operation_logger)
# Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception # Script got manually interrupted ... N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError): except (KeyboardInterrupt, EOFError):
@ -1840,7 +1840,7 @@ class BackupMethod(object):
# Ask confirmation for copying # Ask confirmation for copying
if size > MB_ALLOWED_TO_ORGANIZE: if size > MB_ALLOWED_TO_ORGANIZE:
try: try:
i = msignals.prompt( i = Moulinette.prompt(
m18n.n( m18n.n(
"backup_ask_for_copying_if_needed", "backup_ask_for_copying_if_needed",
answers="y/N", answers="y/N",
@ -2344,7 +2344,7 @@ def backup_restore(name, system=[], apps=[], force=False):
if not force: if not force:
try: try:
# Ask confirmation for restoring # Ask confirmation for restoring
i = msignals.prompt( i = Moulinette.prompt(
m18n.n("restore_confirm_yunohost_installed", answers="y/N") m18n.n("restore_confirm_yunohost_installed", answers="y/N")
) )
except NotImplemented: except NotImplemented:
@ -2418,7 +2418,7 @@ def backup_list(with_info=False, human_readable=False):
def backup_download(name): def backup_download(name):
if msettings.get("interface") != "api": if Moulinette.interface.type != "api":
logger.error( logger.error(
"This option is only meant for the API/webadmin and doesn't make sense for the command line." "This option is only meant for the API/webadmin and doesn't make sense for the command line."
) )

View file

@ -28,7 +28,7 @@ import re
import os import os
import time import time
from moulinette import m18n, msettings from moulinette import m18n, Moulinette
from moulinette.utils import log from moulinette.utils import log
from moulinette.utils.filesystem import ( from moulinette.utils.filesystem import (
read_json, read_json,
@ -138,7 +138,7 @@ def diagnosis_show(
url = yunopaste(content) url = yunopaste(content)
logger.info(m18n.n("log_available_on_yunopaste", url=url)) logger.info(m18n.n("log_available_on_yunopaste", url=url))
if msettings.get("interface") == "api": if Moulinette.interface.type == "api":
return {"url": url} return {"url": url}
else: else:
return return
@ -219,7 +219,7 @@ def diagnosis_run(
if email: if email:
_email_diagnosis_issues() _email_diagnosis_issues()
if issues and msettings.get("interface") == "cli": if issues and Moulinette.interface.type == "cli":
logger.warning(m18n.n("diagnosis_display_tip")) logger.warning(m18n.n("diagnosis_display_tip"))
@ -595,7 +595,7 @@ class Diagnoser:
info[1].update(meta_data) info[1].update(meta_data)
s = m18n.n(info[0], **(info[1])) s = m18n.n(info[0], **(info[1]))
# In cli, we remove the html tags # In cli, we remove the html tags
if msettings.get("interface") != "api" or force_remove_html_tags: if Moulinette.interface.type != "api" or force_remove_html_tags:
s = s.replace("<cmd>", "'").replace("</cmd>", "'") s = s.replace("<cmd>", "'").replace("</cmd>", "'")
s = html_tags.sub("", s.replace("<br>", "\n")) s = html_tags.sub("", s.replace("<br>", "\n"))
else: else:

View file

@ -26,7 +26,7 @@
import os import os
import re import re
from moulinette import m18n, msettings, msignals from moulinette import m18n, Moulinette
from moulinette.core import MoulinetteError from moulinette.core import MoulinetteError
from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.error import YunohostError, YunohostValidationError
from moulinette.utils.log import getActionLogger from moulinette.utils.log import getActionLogger
@ -241,8 +241,8 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False):
if apps_on_that_domain: if apps_on_that_domain:
if remove_apps: if remove_apps:
if msettings.get("interface") == "cli" and not force: if Moulinette.interface.type == "cli" and not force:
answer = msignals.prompt( answer = Moulinette.prompt(
m18n.n( m18n.n(
"domain_remove_confirm_apps_removal", "domain_remove_confirm_apps_removal",
apps="\n".join([x[1] for x in apps_on_that_domain]), apps="\n".join([x[1] for x in apps_on_that_domain]),
@ -348,7 +348,7 @@ def domain_dns_conf(domain, ttl=None):
for record in record_list: for record in record_list:
result += "\n{name} {ttl} IN {type} {value}".format(**record) result += "\n{name} {ttl} IN {type} {value}".format(**record)
if msettings.get("interface") == "cli": if Moulinette.interface.type == "cli":
logger.info(m18n.n("domain_dns_conf_is_just_a_recommendation")) logger.info(m18n.n("domain_dns_conf_is_just_a_recommendation"))
return result return result

View file

@ -31,7 +31,7 @@ import mimetypes
from glob import iglob from glob import iglob
from importlib import import_module from importlib import import_module
from moulinette import m18n, msettings from moulinette import m18n, Moulinette
from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.error import YunohostError, YunohostValidationError
from moulinette.utils import log from moulinette.utils import log
from moulinette.utils.filesystem import read_json from moulinette.utils.filesystem import read_json
@ -416,7 +416,7 @@ def _hook_exec_bash(path, args, chdir, env, user, return_format, loggers):
env = {} env = {}
env["YNH_CWD"] = chdir env["YNH_CWD"] = chdir
env["YNH_INTERFACE"] = msettings.get("interface") env["YNH_INTERFACE"] = Moulinette.interface.type
stdreturn = os.path.join(tempfile.mkdtemp(), "stdreturn") stdreturn = os.path.join(tempfile.mkdtemp(), "stdreturn")
with open(stdreturn, "w") as f: with open(stdreturn, "w") as f:

View file

@ -33,7 +33,7 @@ import psutil
from datetime import datetime, timedelta from datetime import datetime, timedelta
from logging import FileHandler, getLogger, Formatter from logging import FileHandler, getLogger, Formatter
from moulinette import m18n, msettings from moulinette import m18n, Moulinette
from moulinette.core import MoulinetteError from moulinette.core import MoulinetteError
from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.packages import get_ynh_package_version from yunohost.utils.packages import get_ynh_package_version
@ -44,7 +44,6 @@ CATEGORIES_PATH = "/var/log/yunohost/categories/"
OPERATIONS_PATH = "/var/log/yunohost/categories/operation/" OPERATIONS_PATH = "/var/log/yunohost/categories/operation/"
METADATA_FILE_EXT = ".yml" METADATA_FILE_EXT = ".yml"
LOG_FILE_EXT = ".log" LOG_FILE_EXT = ".log"
RELATED_CATEGORIES = ["app", "domain", "group", "service", "user"]
logger = getActionLogger("yunohost.log") logger = getActionLogger("yunohost.log")
@ -125,7 +124,7 @@ def log_list(limit=None, with_details=False, with_suboperations=False):
operations = list(reversed(sorted(operations, key=lambda o: o["name"]))) operations = list(reversed(sorted(operations, key=lambda o: o["name"])))
# Reverse the order of log when in cli, more comfortable to read (avoid # Reverse the order of log when in cli, more comfortable to read (avoid
# unecessary scrolling) # unecessary scrolling)
is_api = msettings.get("interface") == "api" is_api = Moulinette.interface.type == "api"
if not is_api: if not is_api:
operations = list(reversed(operations)) operations = list(reversed(operations))
@ -214,7 +213,7 @@ def log_show(
url = yunopaste(content) url = yunopaste(content)
logger.info(m18n.n("log_available_on_yunopaste", url=url)) logger.info(m18n.n("log_available_on_yunopaste", url=url))
if msettings.get("interface") == "api": if Moulinette.interface.type == "api":
return {"url": url} return {"url": url}
else: else:
return return
@ -609,7 +608,7 @@ class OperationLogger(object):
"operation": self.operation, "operation": self.operation,
"parent": self.parent, "parent": self.parent,
"yunohost_version": get_ynh_package_version("yunohost")["version"], "yunohost_version": get_ynh_package_version("yunohost")["version"],
"interface": msettings.get("interface"), "interface": Moulinette.interface.type,
} }
if self.related_to is not None: if self.related_to is not None:
data["related_to"] = self.related_to data["related_to"] = self.related_to
@ -663,7 +662,7 @@ class OperationLogger(object):
self.logger.removeHandler(self.file_handler) self.logger.removeHandler(self.file_handler)
self.file_handler.close() self.file_handler.close()
is_api = msettings.get("interface") == "api" is_api = Moulinette.interface.type == "api"
desc = _get_description_from_name(self.name) desc = _get_description_from_name(self.name)
if error is None: if error is None:
if is_api: if is_api:

View file

@ -3,7 +3,7 @@ import pytest
import sys import sys
import moulinette import moulinette
from moulinette import m18n, msettings from moulinette import m18n, Moulinette
from yunohost.utils.error import YunohostError from yunohost.utils.error import YunohostError
from contextlib import contextmanager from contextlib import contextmanager
@ -81,4 +81,11 @@ def pytest_cmdline_main(config):
import yunohost import yunohost
yunohost.init(debug=config.option.yunodebug) yunohost.init(debug=config.option.yunodebug)
msettings["interface"] = "test" class DummyInterface():
type = "test"
def prompt(*args, **kwargs):
raise NotImplementedError
Moulinette._interface = DummyInterface()

View file

@ -5,7 +5,7 @@ from mock import patch
from io import StringIO from io import StringIO
from collections import OrderedDict from collections import OrderedDict
from moulinette import msignals from moulinette import Moulinette
from yunohost import domain, user from yunohost import domain, user
from yunohost.app import _parse_args_in_yunohost_format, PasswordArgumentParser from yunohost.app import _parse_args_in_yunohost_format, PasswordArgumentParser
@ -84,7 +84,7 @@ def test_parse_args_in_yunohost_format_string_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_string": ("some_value", "string")}) expected_result = OrderedDict({"some_string": ("some_value", "string")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -97,7 +97,7 @@ def test_parse_args_in_yunohost_format_string_input_no_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_string": ("some_value", "string")}) expected_result = OrderedDict({"some_string": ("some_value", "string")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -124,7 +124,7 @@ def test_parse_args_in_yunohost_format_string_optional_with_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_string": ("some_value", "string")}) expected_result = OrderedDict({"some_string": ("some_value", "string")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -139,7 +139,7 @@ def test_parse_args_in_yunohost_format_string_optional_with_empty_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_string": ("", "string")}) expected_result = OrderedDict({"some_string": ("", "string")})
with patch.object(msignals, "prompt", return_value=""): with patch.object(Moulinette.interface, "prompt", return_value=""):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -153,7 +153,7 @@ def test_parse_args_in_yunohost_format_string_optional_with_input_without_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_string": ("some_value", "string")}) expected_result = OrderedDict({"some_string": ("some_value", "string")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -180,7 +180,7 @@ def test_parse_args_in_yunohost_format_string_input_test_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with(ask_text, False) prompt.assert_called_with(ask_text, False)
@ -197,7 +197,7 @@ def test_parse_args_in_yunohost_format_string_input_test_ask_with_default():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with("%s (default: %s)" % (ask_text, default_text), False) prompt.assert_called_with("%s (default: %s)" % (ask_text, default_text), False)
@ -215,7 +215,7 @@ def test_parse_args_in_yunohost_format_string_input_test_ask_with_example():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert example_text in prompt.call_args[0][0] assert example_text in prompt.call_args[0][0]
@ -234,7 +234,7 @@ def test_parse_args_in_yunohost_format_string_input_test_ask_with_help():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert help_text in prompt.call_args[0][0] assert help_text in prompt.call_args[0][0]
@ -251,7 +251,7 @@ def test_parse_args_in_yunohost_format_string_with_choice_prompt():
questions = [{"name": "some_string", "type": "string", "choices": ["fr", "en"]}] questions = [{"name": "some_string", "type": "string", "choices": ["fr", "en"]}]
answers = {"some_string": "fr"} answers = {"some_string": "fr"}
expected_result = OrderedDict({"some_string": ("fr", "string")}) expected_result = OrderedDict({"some_string": ("fr", "string")})
with patch.object(msignals, "prompt", return_value="fr"): with patch.object(Moulinette.interface, "prompt", return_value="fr"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -275,7 +275,7 @@ def test_parse_args_in_yunohost_format_string_with_choice_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="ru") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="ru") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
@ -333,7 +333,7 @@ def test_parse_args_in_yunohost_format_password_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_password": ("some_value", "password")}) expected_result = OrderedDict({"some_password": ("some_value", "password")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -347,7 +347,7 @@ def test_parse_args_in_yunohost_format_password_input_no_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_password": ("some_value", "password")}) expected_result = OrderedDict({"some_password": ("some_value", "password")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -383,7 +383,7 @@ def test_parse_args_in_yunohost_format_password_optional_with_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_password": ("some_value", "password")}) expected_result = OrderedDict({"some_password": ("some_value", "password")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -399,7 +399,7 @@ def test_parse_args_in_yunohost_format_password_optional_with_empty_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_password": ("", "password")}) expected_result = OrderedDict({"some_password": ("", "password")})
with patch.object(msignals, "prompt", return_value=""): with patch.object(Moulinette.interface, "prompt", return_value=""):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -414,7 +414,7 @@ def test_parse_args_in_yunohost_format_password_optional_with_input_without_ask(
answers = {} answers = {}
expected_result = OrderedDict({"some_password": ("some_value", "password")}) expected_result = OrderedDict({"some_password": ("some_value", "password")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -462,7 +462,7 @@ def test_parse_args_in_yunohost_format_password_input_test_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with(ask_text, True) prompt.assert_called_with(ask_text, True)
@ -481,7 +481,7 @@ def test_parse_args_in_yunohost_format_password_input_test_ask_with_example():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert example_text in prompt.call_args[0][0] assert example_text in prompt.call_args[0][0]
@ -501,7 +501,7 @@ def test_parse_args_in_yunohost_format_password_input_test_ask_with_help():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert help_text in prompt.call_args[0][0] assert help_text in prompt.call_args[0][0]
@ -594,7 +594,7 @@ def test_parse_args_in_yunohost_format_path_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_path": ("some_value", "path")}) expected_result = OrderedDict({"some_path": ("some_value", "path")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -608,7 +608,7 @@ def test_parse_args_in_yunohost_format_path_input_no_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_path": ("some_value", "path")}) expected_result = OrderedDict({"some_path": ("some_value", "path")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -637,7 +637,7 @@ def test_parse_args_in_yunohost_format_path_optional_with_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_path": ("some_value", "path")}) expected_result = OrderedDict({"some_path": ("some_value", "path")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -653,7 +653,7 @@ def test_parse_args_in_yunohost_format_path_optional_with_empty_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_path": ("", "path")}) expected_result = OrderedDict({"some_path": ("", "path")})
with patch.object(msignals, "prompt", return_value=""): with patch.object(Moulinette.interface, "prompt", return_value=""):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -668,7 +668,7 @@ def test_parse_args_in_yunohost_format_path_optional_with_input_without_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_path": ("some_value", "path")}) expected_result = OrderedDict({"some_path": ("some_value", "path")})
with patch.object(msignals, "prompt", return_value="some_value"): with patch.object(Moulinette.interface, "prompt", return_value="some_value"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -697,7 +697,7 @@ def test_parse_args_in_yunohost_format_path_input_test_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with(ask_text, False) prompt.assert_called_with(ask_text, False)
@ -715,7 +715,7 @@ def test_parse_args_in_yunohost_format_path_input_test_ask_with_default():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with("%s (default: %s)" % (ask_text, default_text), False) prompt.assert_called_with("%s (default: %s)" % (ask_text, default_text), False)
@ -734,7 +734,7 @@ def test_parse_args_in_yunohost_format_path_input_test_ask_with_example():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert example_text in prompt.call_args[0][0] assert example_text in prompt.call_args[0][0]
@ -754,7 +754,7 @@ def test_parse_args_in_yunohost_format_path_input_test_ask_with_help():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="some_value") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="some_value") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert help_text in prompt.call_args[0][0] assert help_text in prompt.call_args[0][0]
@ -918,11 +918,11 @@ def test_parse_args_in_yunohost_format_boolean_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_boolean": (1, "boolean")}) expected_result = OrderedDict({"some_boolean": (1, "boolean")})
with patch.object(msignals, "prompt", return_value="y"): with patch.object(Moulinette.interface, "prompt", return_value="y"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
expected_result = OrderedDict({"some_boolean": (0, "boolean")}) expected_result = OrderedDict({"some_boolean": (0, "boolean")})
with patch.object(msignals, "prompt", return_value="n"): with patch.object(Moulinette.interface, "prompt", return_value="n"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -936,7 +936,7 @@ def test_parse_args_in_yunohost_format_boolean_input_no_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_boolean": (1, "boolean")}) expected_result = OrderedDict({"some_boolean": (1, "boolean")})
with patch.object(msignals, "prompt", return_value="y"): with patch.object(Moulinette.interface, "prompt", return_value="y"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -965,7 +965,7 @@ def test_parse_args_in_yunohost_format_boolean_optional_with_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_boolean": (1, "boolean")}) expected_result = OrderedDict({"some_boolean": (1, "boolean")})
with patch.object(msignals, "prompt", return_value="y"): with patch.object(Moulinette.interface, "prompt", return_value="y"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -981,7 +981,7 @@ def test_parse_args_in_yunohost_format_boolean_optional_with_empty_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_boolean": (0, "boolean")}) # default to false expected_result = OrderedDict({"some_boolean": (0, "boolean")}) # default to false
with patch.object(msignals, "prompt", return_value=""): with patch.object(Moulinette.interface, "prompt", return_value=""):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -996,7 +996,7 @@ def test_parse_args_in_yunohost_format_boolean_optional_with_input_without_ask()
answers = {} answers = {}
expected_result = OrderedDict({"some_boolean": (0, "boolean")}) expected_result = OrderedDict({"some_boolean": (0, "boolean")})
with patch.object(msignals, "prompt", return_value="n"): with patch.object(Moulinette.interface, "prompt", return_value="n"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1039,7 +1039,7 @@ def test_parse_args_in_yunohost_format_boolean_input_test_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value=0) as prompt: with patch.object(Moulinette.interface, "prompt", return_value=0) as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with(ask_text + " [yes | no] (default: no)", False) prompt.assert_called_with(ask_text + " [yes | no] (default: no)", False)
@ -1057,7 +1057,7 @@ def test_parse_args_in_yunohost_format_boolean_input_test_ask_with_default():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value=1) as prompt: with patch.object(Moulinette.interface, "prompt", return_value=1) as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with("%s [yes | no] (default: yes)" % ask_text, False) prompt.assert_called_with("%s [yes | no] (default: yes)" % ask_text, False)
@ -1193,11 +1193,11 @@ def test_parse_args_in_yunohost_format_domain_two_domains_default_input():
domain, "_get_maindomain", return_value=main_domain domain, "_get_maindomain", return_value=main_domain
), patch.object(domain, "domain_list", return_value={"domains": domains}): ), patch.object(domain, "domain_list", return_value={"domains": domains}):
expected_result = OrderedDict({"some_domain": (main_domain, "domain")}) expected_result = OrderedDict({"some_domain": (main_domain, "domain")})
with patch.object(msignals, "prompt", return_value=main_domain): with patch.object(Moulinette.interface, "prompt", return_value=main_domain):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
expected_result = OrderedDict({"some_domain": (other_domain, "domain")}) expected_result = OrderedDict({"some_domain": (other_domain, "domain")})
with patch.object(msignals, "prompt", return_value=other_domain): with patch.object(Moulinette.interface, "prompt", return_value=other_domain):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1380,14 +1380,14 @@ def test_parse_args_in_yunohost_format_user_two_users_default_input():
with patch.object(user, "user_list", return_value={"users": users}): with patch.object(user, "user_list", return_value={"users": users}):
with patch.object(user, "user_info", return_value={}): with patch.object(user, "user_info", return_value={}):
expected_result = OrderedDict({"some_user": (username, "user")}) expected_result = OrderedDict({"some_user": (username, "user")})
with patch.object(msignals, "prompt", return_value=username): with patch.object(Moulinette.interface, "prompt", return_value=username):
assert ( assert (
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
== expected_result == expected_result
) )
expected_result = OrderedDict({"some_user": (other_user, "user")}) expected_result = OrderedDict({"some_user": (other_user, "user")})
with patch.object(msignals, "prompt", return_value=other_user): with patch.object(Moulinette.interface, "prompt", return_value=other_user):
assert ( assert (
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
== expected_result == expected_result
@ -1447,14 +1447,14 @@ def test_parse_args_in_yunohost_format_number_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_number": (1337, "number")}) expected_result = OrderedDict({"some_number": (1337, "number")})
with patch.object(msignals, "prompt", return_value="1337"): with patch.object(Moulinette.interface, "prompt", return_value="1337"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
with patch.object(msignals, "prompt", return_value=1337): with patch.object(Moulinette.interface, "prompt", return_value=1337):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
expected_result = OrderedDict({"some_number": (0, "number")}) expected_result = OrderedDict({"some_number": (0, "number")})
with patch.object(msignals, "prompt", return_value=""): with patch.object(Moulinette.interface, "prompt", return_value=""):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1468,7 +1468,7 @@ def test_parse_args_in_yunohost_format_number_input_no_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_number": (1337, "number")}) expected_result = OrderedDict({"some_number": (1337, "number")})
with patch.object(msignals, "prompt", return_value="1337"): with patch.object(Moulinette.interface, "prompt", return_value="1337"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1497,7 +1497,7 @@ def test_parse_args_in_yunohost_format_number_optional_with_input():
answers = {} answers = {}
expected_result = OrderedDict({"some_number": (1337, "number")}) expected_result = OrderedDict({"some_number": (1337, "number")})
with patch.object(msignals, "prompt", return_value="1337"): with patch.object(Moulinette.interface, "prompt", return_value="1337"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1512,7 +1512,7 @@ def test_parse_args_in_yunohost_format_number_optional_with_input_without_ask():
answers = {} answers = {}
expected_result = OrderedDict({"some_number": (0, "number")}) expected_result = OrderedDict({"some_number": (0, "number")})
with patch.object(msignals, "prompt", return_value="0"): with patch.object(Moulinette.interface, "prompt", return_value="0"):
assert _parse_args_in_yunohost_format(answers, questions) == expected_result assert _parse_args_in_yunohost_format(answers, questions) == expected_result
@ -1555,7 +1555,7 @@ def test_parse_args_in_yunohost_format_number_input_test_ask():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="1111") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="1111") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with("%s (default: 0)" % (ask_text), False) prompt.assert_called_with("%s (default: 0)" % (ask_text), False)
@ -1573,7 +1573,7 @@ def test_parse_args_in_yunohost_format_number_input_test_ask_with_default():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="1111") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="1111") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
prompt.assert_called_with("%s (default: %s)" % (ask_text, default_value), False) prompt.assert_called_with("%s (default: %s)" % (ask_text, default_value), False)
@ -1592,7 +1592,7 @@ def test_parse_args_in_yunohost_format_number_input_test_ask_with_example():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="1111") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="1111") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert example_value in prompt.call_args[0][0] assert example_value in prompt.call_args[0][0]
@ -1612,7 +1612,7 @@ def test_parse_args_in_yunohost_format_number_input_test_ask_with_help():
] ]
answers = {} answers = {}
with patch.object(msignals, "prompt", return_value="1111") as prompt: with patch.object(Moulinette.interface, "prompt", return_value="1111") as prompt:
_parse_args_in_yunohost_format(answers, questions) _parse_args_in_yunohost_format(answers, questions)
assert ask_text in prompt.call_args[0][0] assert ask_text in prompt.call_args[0][0]
assert help_value in prompt.call_args[0][0] assert help_value in prompt.call_args[0][0]

View file

@ -0,0 +1,58 @@
import pytest
import os
from yunohost.authenticators.ldap_admin import Authenticator as LDAPAuth
from yunohost.tools import tools_adminpw
from moulinette import m18n
from moulinette.core import MoulinetteError
def setup_function(function):
if os.system("systemctl is-active slapd") != 0:
os.system("systemctl start slapd && sleep 3")
tools_adminpw("yunohost", check_strength=False)
def test_authenticate():
LDAPAuth().authenticate_credentials(credentials="yunohost")
def test_authenticate_with_wrong_password():
with pytest.raises(MoulinetteError) as exception:
LDAPAuth().authenticate_credentials(credentials="bad_password_lul")
translation = m18n.g("invalid_password")
expected_msg = translation.format()
assert expected_msg in str(exception)
def test_authenticate_server_down(mocker):
os.system("systemctl stop slapd && sleep 3")
# Now if slapd is down, moulinette tries to restart it
mocker.patch("os.system")
mocker.patch("time.sleep")
with pytest.raises(MoulinetteError) as exception:
LDAPAuth().authenticate_credentials(credentials="yunohost")
translation = m18n.n("ldap_server_down")
expected_msg = translation.format()
assert expected_msg in str(exception)
def test_authenticate_change_password():
LDAPAuth().authenticate_credentials(credentials="yunohost")
tools_adminpw("plopette", check_strength=False)
with pytest.raises(MoulinetteError) as exception:
LDAPAuth().authenticate_credentials(credentials="yunohost")
translation = m18n.g("invalid_password")
expected_msg = translation.format()
assert expected_msg in str(exception)
LDAPAuth().authenticate_credentials(credentials="plopette")

View file

@ -30,7 +30,7 @@ import time
from importlib import import_module from importlib import import_module
from packaging import version from packaging import version
from moulinette import msignals, m18n from moulinette import Moulinette, m18n
from moulinette.utils.log import getActionLogger from moulinette.utils.log import getActionLogger
from moulinette.utils.process import check_output, call_async_output from moulinette.utils.process import check_output, call_async_output
from moulinette.utils.filesystem import read_yaml, write_to_yaml from moulinette.utils.filesystem import read_yaml, write_to_yaml
@ -692,7 +692,7 @@ def tools_shutdown(operation_logger, force=False):
if not shutdown: if not shutdown:
try: try:
# Ask confirmation for server shutdown # Ask confirmation for server shutdown
i = msignals.prompt(m18n.n("server_shutdown_confirm", answers="y/N")) i = Moulinette.prompt(m18n.n("server_shutdown_confirm", answers="y/N"))
except NotImplemented: except NotImplemented:
pass pass
else: else:
@ -711,7 +711,7 @@ def tools_reboot(operation_logger, force=False):
if not reboot: if not reboot:
try: try:
# Ask confirmation for restoring # Ask confirmation for restoring
i = msignals.prompt(m18n.n("server_reboot_confirm", answers="y/N")) i = Moulinette.prompt(m18n.n("server_reboot_confirm", answers="y/N"))
except NotImplemented: except NotImplemented:
pass pass
else: else:

View file

@ -33,7 +33,7 @@ import string
import subprocess import subprocess
import copy import copy
from moulinette import msignals, msettings, m18n from moulinette import Moulinette, m18n
from moulinette.utils.log import getActionLogger from moulinette.utils.log import getActionLogger
from moulinette.utils.process import check_output from moulinette.utils.process import check_output
@ -117,18 +117,18 @@ def user_create(
# Validate domain used for email address/xmpp account # Validate domain used for email address/xmpp account
if domain is None: if domain is None:
if msettings.get("interface") == "api": if Moulinette.interface.type == "api":
raise YunohostValidationError( raise YunohostValidationError(
"Invalid usage, you should specify a domain argument" "Invalid usage, you should specify a domain argument"
) )
else: else:
# On affiche les differents domaines possibles # On affiche les differents domaines possibles
msignals.display(m18n.n("domains_available")) Moulinette.display(m18n.n("domains_available"))
for domain in domain_list()["domains"]: for domain in domain_list()["domains"]:
msignals.display("- {}".format(domain)) Moulinette.display("- {}".format(domain))
maindomain = _get_maindomain() maindomain = _get_maindomain()
domain = msignals.prompt( domain = Moulinette.prompt(
m18n.n("ask_user_domain") + " (default: %s)" % maindomain m18n.n("ask_user_domain") + " (default: %s)" % maindomain
) )
if not domain: if not domain:
@ -379,8 +379,8 @@ def user_update(
# when in the cli interface if the option to change the password is called # when in the cli interface if the option to change the password is called
# without a specified value, change_password will be set to the const 0. # without a specified value, change_password will be set to the const 0.
# In this case we prompt for the new password. # In this case we prompt for the new password.
if msettings.get("interface") == "cli" and not change_password: if Moulinette.interface.type == "cli" and not change_password:
change_password = msignals.prompt(m18n.n("ask_password"), True, True) change_password = Moulinette.prompt(m18n.n("ask_password"), True, True)
# Ensure sufficiently complex password # Ensure sufficiently complex password
assert_password_is_strong_enough("user", change_password) assert_password_is_strong_enough("user", change_password)

View file

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" License """ License
Copyright (C) 2019 YunoHost Copyright (C) 2019 YunoHost
@ -21,10 +20,18 @@
import os import os
import atexit import atexit
from moulinette.core import MoulinetteLdapIsDownError import logging
from moulinette.authenticators import ldap import ldap
import ldap.sasl
import time
import ldap.modlist as modlist
from moulinette import m18n
from moulinette.core import MoulinetteError
from yunohost.utils.error import YunohostError from yunohost.utils.error import YunohostError
logger = logging.getLogger("yunohost.utils.ldap")
# We use a global variable to do some caching # We use a global variable to do some caching
# to avoid re-authenticating in case we call _get_ldap_authenticator multiple times # to avoid re-authenticating in case we call _get_ldap_authenticator multiple times
_ldap_interface = None _ldap_interface = None
@ -35,51 +42,21 @@ def _get_ldap_interface():
global _ldap_interface global _ldap_interface
if _ldap_interface is None: if _ldap_interface is None:
_ldap_interface = LDAPInterface()
conf = {
"vendor": "ldap",
"name": "as-root",
"parameters": {
"uri": "ldapi://%2Fvar%2Frun%2Fslapd%2Fldapi",
"base_dn": "dc=yunohost,dc=org",
"user_rdn": "gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth",
},
"extra": {},
}
try:
_ldap_interface = ldap.Authenticator(**conf)
except MoulinetteLdapIsDownError:
raise YunohostError(
"Service slapd is not running but is required to perform this action ... You can try to investigate what's happening with 'systemctl status slapd'"
)
assert_slapd_is_running()
return _ldap_interface return _ldap_interface
def assert_slapd_is_running():
# Assert slapd is running...
if not os.system("pgrep slapd >/dev/null") == 0:
raise YunohostError(
"Service slapd is not running but is required to perform this action ... You can try to investigate what's happening with 'systemctl status slapd'"
)
# We regularly want to extract stuff like 'bar' in ldap path like # We regularly want to extract stuff like 'bar' in ldap path like
# foo=bar,dn=users.example.org,ou=example.org,dc=org so this small helper allow # foo=bar,dn=users.example.org,ou=example.org,dc=org so this small helper allow
# to do this without relying of dozens of mysterious string.split()[0] # to do this without relying of dozens of mysterious string.split()[0]
# #
# e.g. using _ldap_path_extract(path, "foo") on the previous example will # e.g. using _ldap_path_extract(path, "foo") on the previous example will
# return bar # return bar
def _ldap_path_extract(path, info): def _ldap_path_extract(path, info):
for element in path.split(","): for element in path.split(","):
if element.startswith(info + "="): if element.startswith(info + "="):
return element[len(info + "=") :] return element[len(info + "="):]
# Add this to properly close / delete the ldap interface / authenticator # Add this to properly close / delete the ldap interface / authenticator
@ -93,3 +70,247 @@ def _destroy_ldap_interface():
atexit.register(_destroy_ldap_interface) atexit.register(_destroy_ldap_interface)
class LDAPInterface():
def __init__(self):
logger.debug("initializing ldap interface")
self.uri = "ldapi://%2Fvar%2Frun%2Fslapd%2Fldapi"
self.basedn = "dc=yunohost,dc=org"
self.rootdn = "gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth"
self.connect()
def connect(self):
def _reconnect():
con = ldap.ldapobject.ReconnectLDAPObject(
self.uri, retry_max=10, retry_delay=0.5
)
con.sasl_non_interactive_bind_s("EXTERNAL")
return con
try:
con = _reconnect()
except ldap.SERVER_DOWN:
# ldap is down, attempt to restart it before really failing
logger.warning(m18n.n("ldap_server_is_down_restart_it"))
os.system("systemctl restart slapd")
time.sleep(10) # waits 10 secondes so we are sure that slapd has restarted
try:
con = _reconnect()
except ldap.SERVER_DOWN:
raise YunohostError(
"Service slapd is not running but is required to perform this action ... "
"You can try to investigate what's happening with 'systemctl status slapd'"
)
# Check that we are indeed logged in with the right identity
try:
# whoami_s return dn:..., then delete these 3 characters
who = con.whoami_s()[3:]
except Exception as e:
logger.warning("Error during ldap authentication process: %s", e)
raise
else:
if who != self.rootdn:
raise MoulinetteError("Not logged in with the expected userdn ?!")
else:
self.con = con
def __del__(self):
"""Disconnect and free ressources"""
if hasattr(self, "con") and self.con:
self.con.unbind_s()
def search(self, base=None, filter="(objectClass=*)", attrs=["dn"]):
"""Search in LDAP base
Perform an LDAP search operation with given arguments and return
results as a list.
Keyword arguments:
- base -- The dn to search into
- filter -- A string representation of the filter to apply
- attrs -- A list of attributes to fetch
Returns:
A list of all results
"""
if not base:
base = self.basedn
try:
result = self.con.search_s(base, ldap.SCOPE_SUBTREE, filter, attrs)
except Exception as e:
raise MoulinetteError(
"error during LDAP search operation with: base='%s', "
"filter='%s', attrs=%s and exception %s" % (base, filter, attrs, e),
raw_msg=True,
)
result_list = []
if not attrs or "dn" not in attrs:
result_list = [entry for dn, entry in result]
else:
for dn, entry in result:
entry["dn"] = [dn]
result_list.append(entry)
def decode(value):
if isinstance(value, bytes):
value = value.decode("utf-8")
return value
# result_list is for example :
# [{'virtualdomain': [b'test.com']}, {'virtualdomain': [b'yolo.test']},
for stuff in result_list:
if isinstance(stuff, dict):
for key, values in stuff.items():
stuff[key] = [decode(v) for v in values]
return result_list
def add(self, rdn, attr_dict):
"""
Add LDAP entry
Keyword arguments:
rdn -- DN without domain
attr_dict -- Dictionnary of attributes/values to add
Returns:
Boolean | MoulinetteError
"""
dn = rdn + "," + self.basedn
ldif = modlist.addModlist(attr_dict)
for i, (k, v) in enumerate(ldif):
if isinstance(v, list):
v = [a.encode("utf-8") for a in v]
elif isinstance(v, str):
v = [v.encode("utf-8")]
ldif[i] = (k, v)
try:
self.con.add_s(dn, ldif)
except Exception as e:
raise MoulinetteError(
"error during LDAP add operation with: rdn='%s', "
"attr_dict=%s and exception %s" % (rdn, attr_dict, e),
raw_msg=True,
)
else:
return True
def remove(self, rdn):
"""
Remove LDAP entry
Keyword arguments:
rdn -- DN without domain
Returns:
Boolean | MoulinetteError
"""
dn = rdn + "," + self.basedn
try:
self.con.delete_s(dn)
except Exception as e:
raise MoulinetteError(
"error during LDAP delete operation with: rdn='%s' and exception %s"
% (rdn, e),
raw_msg=True,
)
else:
return True
def update(self, rdn, attr_dict, new_rdn=False):
"""
Modify LDAP entry
Keyword arguments:
rdn -- DN without domain
attr_dict -- Dictionnary of attributes/values to add
new_rdn -- New RDN for modification
Returns:
Boolean | MoulinetteError
"""
dn = rdn + "," + self.basedn
actual_entry = self.search(base=dn, attrs=None)
ldif = modlist.modifyModlist(actual_entry[0], attr_dict, ignore_oldexistent=1)
if ldif == []:
logger.debug("Nothing to update in LDAP")
return True
try:
if new_rdn:
self.con.rename_s(dn, new_rdn)
new_base = dn.split(",", 1)[1]
dn = new_rdn + "," + new_base
for i, (a, k, vs) in enumerate(ldif):
if isinstance(vs, list):
vs = [v.encode("utf-8") for v in vs]
elif isinstance(vs, str):
vs = [vs.encode("utf-8")]
ldif[i] = (a, k, vs)
self.con.modify_ext_s(dn, ldif)
except Exception as e:
raise MoulinetteError(
"error during LDAP update operation with: rdn='%s', "
"attr_dict=%s, new_rdn=%s and exception: %s"
% (rdn, attr_dict, new_rdn, e),
raw_msg=True,
)
else:
return True
def validate_uniqueness(self, value_dict):
"""
Check uniqueness of values
Keyword arguments:
value_dict -- Dictionnary of attributes/values to check
Returns:
Boolean | MoulinetteError
"""
attr_found = self.get_conflict(value_dict)
if attr_found:
logger.info(
"attribute '%s' with value '%s' is not unique",
attr_found[0],
attr_found[1],
)
raise MoulinetteError(
"ldap_attribute_already_exists",
attribute=attr_found[0],
value=attr_found[1],
)
return True
def get_conflict(self, value_dict, base_dn=None):
"""
Check uniqueness of values
Keyword arguments:
value_dict -- Dictionnary of attributes/values to check
Returns:
None | tuple with Fist conflict attribute name and value
"""
for attr, value in value_dict.items():
if not self.search(base=base_dn, filter=attr + "=" + value):
continue
else:
return (attr, value)
return None

View file

@ -35,6 +35,7 @@ def find_expected_string_keys():
python_files = glob.glob("src/yunohost/*.py") python_files = glob.glob("src/yunohost/*.py")
python_files.extend(glob.glob("src/yunohost/utils/*.py")) python_files.extend(glob.glob("src/yunohost/utils/*.py"))
python_files.extend(glob.glob("src/yunohost/data_migrations/*.py")) python_files.extend(glob.glob("src/yunohost/data_migrations/*.py"))
python_files.extend(glob.glob("src/yunohost/authenticators/*.py"))
python_files.extend(glob.glob("data/hooks/diagnosis/*.py")) python_files.extend(glob.glob("data/hooks/diagnosis/*.py"))
python_files.append("bin/yunohost") python_files.append("bin/yunohost")