Merge branch 'stretch-unstable' into update_doc_for_group_and_permission

This commit is contained in:
Josue-T 2019-07-25 00:21:00 +02:00 committed by GitHub
commit eeb880ac9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 764 additions and 734 deletions

View file

@ -1,5 +1,7 @@
language: python
install: "pip install pep8"
install: pip install tox pep8
python:
- "2.7"
script: "pep8 --ignore E501,E128,E731 moulinette"
- 2.7
script:
- pep8 moulinette
- tox

View file

@ -13,7 +13,7 @@ _global:
parameters:
uri: ldap://localhost:389
base_dn: dc=yunohost,dc=org
user_rdn: cn=admin
user_rdn: cn=admin,dc=yunohost,dc=org
ldap-anonymous:
vendor: ldap
parameters:
@ -25,7 +25,14 @@ _global:
parameters:
uri: ldap://localhost:389
base_dn: dc=yunohost,dc=org
user_rdn: cn=admin
user_rdn: cn=admin,dc=yunohost,dc=org
as-root:
vendor: ldap
parameters:
# We can get this uri by (urllib.quote_plus('/var/run/slapd/ldapi')
uri: ldapi://%2Fvar%2Frun%2Fslapd%2Fldapi
base_dn: dc=yunohost,dc=org
user_rdn: gidNumber=0+uidNumber=0,cn=peercred,cn=external,cn=auth
argument_auth: true
lock: false
@ -52,6 +59,11 @@ test:
configuration:
authenticate:
- cli
root-auth:
api: GET /test/root-auth
configuration:
authenticate: all
authenticator: as-root
anonymous:
api: GET /test/anon
configuration:

67
debian/changelog vendored
View file

@ -1,3 +1,70 @@
moulinette (3.6.4) stable; urgency=low
Bumping version number for stable release
-- Alexandre Aubin <alex.aubin@mailoo.org> Tue, 04 Jul 2019 23:30:00 +0000
moulinette (3.6.1) testing; urgency=low
- [enh] Add LDIF parsing utility (#201)
Thanks to all contributors <3 ! (Josué)
-- Alexandre Aubin <alex.aubin@mailoo.org> Tue, 04 Jun 2019 13:20:00 +0000
moulinette (3.6.0) testing; urgency=low
- [enh] Allow to use SASL authentication for LDAP (by root user) (#183)
- [i18n] Improve translation for Spanish
Thanks to all contributors (Josue, advocatux) <3 !
-- Alexandre Aubin <alex.aubin@mailoo.org> Wed, 22 May 2019 19:45:00 +0000
moulinette (3.5.2) stable; urgency=low
- Release as stable !
- [fix] Do not miserably crash if the lock does not exist when attempting to release it
- [i18n] Update translation for Arabic, Italian
Thanks to all contributors (Aleks, BoF, silkevicious) <3 !
-- Alexandre Aubin <alex.aubin@mailoo.org> Wed, 10 Apr 2019 02:14:00 +0000
moulinette (3.5.1) testing; urgency=low
* [fix] Fix case where stdinfo is not provided in call_async_output (0a300e5)
* [i18n] Improve translation for Greek, Hungarian, Polish, Swedish, French, Catalan, Occitan
Thanks to all contributors (Aleks, ariasuni, Quenti, ppr, Xaloc) <3 !
-- Alexandre Aubin <alex.aubin@mailoo.org> Wed, 03 Apr 2019 02:25:00 +0000
moulinette (3.5.0) testing; urgency=low
* [i18n] Improve Russian and Chinese (Mandarin) translations
Contributors : n3uz, Алексей
-- Alexandre Aubin <alex.aubin@mailoo.org> Wed, 13 Mar 2019 17:20:00 +0000
moulinette (3.4.2) stable; urgency=low
* [i18n] Improve Basque translation
Thanks to all contributors (A. Garaialde) <3 !
-- Alexandre Aubin <alex.aubin@mailoo.org> Tue, 29 Jan 2019 16:50:00 +0000
moulinette (3.4.1) testing; urgency=low
* [i18n] Improve Chinese(Mandarin) translation
* [i18n] Misc orthotypograhy
Thanks to all contributors (Jibec, aleiyer) <3 !
-- Alexandre Aubin <alex.aubin@mailoo.org> Thu, 17 Jan 2019 21:50:00 +0000
moulinette (3.4.0) testing; urgency=low
* [fix] Code cleaning with autopep8 (#187)

1
debian/control vendored
View file

@ -16,6 +16,7 @@ Depends: ${misc:Depends}, ${python:Depends},
python-gnupg,
python-gevent-websocket,
python-argcomplete,
python-toml,
python-psutil,
python-tz
Replaces: yunohost-cli

View file

@ -470,6 +470,19 @@ Here how it looks like for domain and user:
.. automethod:: moulinette.authenticators.ldap.Authenticator.remove
Reading LDIF file
=================
Reading parsing a ldif to be able to insert in the LDAP database is really easy. Here is how to get the content of a LDIF file
::
from moulinette.utils.filesystem import read_ldif
my_reslut = read_ldif("your_file.ldif")
Note that the main difference of what the auth object return with the search method is that this function return a 2-tuples with the "dn" and the LDAP entry.
=============================
LDAP architecture in Yunohost

View file

@ -4,6 +4,7 @@ File system operation utils
.. autofunction:: moulinette.utils.filesystem.read_file
.. autofunction:: moulinette.utils.filesystem.read_json
.. autofunction:: moulinette.utils.filesystem.read_yaml
.. autofunction:: moulinette.utils.filesystem.read_toml
.. autofunction:: moulinette.utils.filesystem.write_to_file
.. autofunction:: moulinette.utils.filesystem.append_to_file
.. autofunction:: moulinette.utils.filesystem.write_to_json

View file

@ -1,19 +0,0 @@
def test_non_auth():
return {'action': 'non-auth'}
def test_auth(auth):
return {'action': 'auth',
'authenticator': 'default', 'authenticate': 'all'}
def test_auth_profile(auth):
return {'action': 'auth-profile',
'authenticator': 'test-profile', 'authenticate': 'all'}
def test_auth_cli():
return {'action': 'auth-cli',
'authenticator': 'default', 'authenticate': ['cli']}
def test_anonymous():
return {'action': 'anonymous',
'authenticator': 'ldap-anonymous', 'authenticate': 'all'}

View file

@ -45,10 +45,11 @@
"error_removing": "خطأ أثناء عملية حذف {path:s}: {error:s}",
"error_changing_file_permissions": "خطأ أثناء عملية تعديل التصريحات لـ {path:s}: {error:s}",
"invalid_url": "خطأ في عنوان الرابط {url:s} (هل هذا الموقع موجود حقًا ؟)",
"download_ssl_error": "خطأ في الإتصال الآمن عبر الـ SSL أثناء محاولة الإتصال بـ {url:s}",
"download_ssl_error": "خطأ في الاتصال الآمن عبر الـ SSL أثناء محاولة الربط بـ {url:s}",
"download_timeout": "{url:s} استغرق مدة طويلة جدا للإستجابة، فتوقّف.",
"download_unknown_error": "خطأ أثناء عملية تنزيل البيانات مِن {url:s} : {error:s}",
"download_bad_status_code": "{url:s} أعاد رمز الحالة {code:s}",
"command_unknown": "الأمر '{command:s}' غير معروف ؟",
"corrupted_yaml": "قراءة مُشوّهة لنسق yaml مِن {ressource:s} (السبب : {error:s})"
"corrupted_yaml": "قراءة مُشوّهة لنسق yaml مِن {ressource:s} (السبب : {error:s})",
"info": "معلومة:"
}

1
locales/bn_BD.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -50,5 +50,6 @@
"download_timeout": "{url:s} ha tardat massa en respondre, s'ha deixat d'esperar.",
"download_unknown_error": "Error al baixar dades des de {url:s}: {error:s}",
"download_bad_status_code": "{url:s} ha retornat el codi d'estat {code:s}",
"command_unknown": "Ordre '{command:s}' desconegut ?"
"command_unknown": "Ordre '{command:s}' desconegut ?",
"info": "Info:"
}

54
locales/cmn.json Normal file
View file

@ -0,0 +1,54 @@
{
"argument_required": "{argument}是必须的",
"authentication_profile_required": "必须验证配置文件{profile}",
"authentication_required": "需要验证",
"authentication_required_long": "此操作需要验证",
"colon": "{} ",
"confirm": "确认{prompt}",
"deprecated_command": "{prog}{command}已经放弃使用,将来会删除",
"deprecated_command_alias": "{prog}{old}已经放弃使用,将来会删除,请使用{prog}{new}代替",
"error": "错误:",
"error_see_log": "发生错误。请参看日志文件获取错误详情,日志文件位于 /var/log/yunohost/。",
"file_exists": "文件已存在:{path}",
"file_not_exist": "文件不存在:{path}",
"folder_exists": "目录已存在:{path}",
"folder_not_exist": "目录不存在",
"info": "信息:",
"instance_already_running": "实例已正在运行",
"invalid_argument": "参数错误{argument}{error}",
"invalid_password": "密码错误",
"invalid_usage": "用法错误,输入 --help 查看帮助信息",
"ldap_attribute_already_exists": "参数{attribute}已赋值{value}",
"ldap_operation_error": "LDAP操作时发生了错误",
"ldap_server_down": "无法连接LDAP服务器",
"logged_in": "登录成功",
"logged_out": "注销成功",
"not_logged_in": "您未登录",
"operation_interrupted": "操作中断",
"password": "密码",
"pattern_not_match": "模式匹配失败",
"root_required": "必须以root身份进行此操作",
"server_already_running": "服务已运行在指定端口",
"success": "成功!",
"unable_authenticate": "认证失败",
"unable_retrieve_session": "获取会话失败",
"unknown_group": "未知组{group}",
"unknown_user": "未知用户{user}",
"values_mismatch": "值不匹配",
"warning": "警告:",
"websocket_request_expected": "期望一个WebSocket请求",
"cannot_open_file": "不能打开文件{file:s}(原因:{error:s}",
"cannot_write_file": "写入文件{file:s}失败(原因:{error:s}",
"unknown_error_reading_file": "尝试读取文件{file:s}时发生错误",
"corrupted_json": "json数据{ressource:s}读取失败(原因:{error:s}",
"corrupted_yaml": "读取yaml文件{ressource:s}失败(原因:{error:s}",
"error_writing_file": "写入文件{file:s}失败:{error:s}",
"error_removing": "删除路径{path:s}失败:{error:s}",
"error_changing_file_permissions": "目录{path:s}权限修改失败:{error:s}",
"invalid_url": "url{url:s}无效site是否存在",
"download_ssl_error": "连接{url:s}时发生SSL错误",
"download_timeout": "{url:s}响应超时,放弃。",
"download_unknown_error": "下载{url:s}失败:{error:s}",
"download_bad_status_code": "{url:s}返回状态码:{code:s}",
"command_unknown": "未知命令:{command:s}"
}

1
locales/el.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -40,16 +40,17 @@
"websocket_request_expected": "Expected a WebSocket request",
"cannot_open_file": "Could not open file {file:s} (reason: {error:s})",
"cannot_write_file": "Could not write file {file:s} (reason: {error:s})",
"unknown_error_reading_file": "Unknown error while trying to read file {file:s}",
"unknown_error_reading_file": "Unknown error while trying to read file {file:s} (reason: {error:s})",
"corrupted_json": "Corrupted json read from {ressource:s} (reason: {error:s})",
"corrupted_yaml": "Corrupted yaml read from {ressource:s} (reason: {error:s})",
"corrupted_toml": "Corrupted toml read from {ressource:s} (reason: {error:s})",
"error_writing_file": "Error when writing file {file:s}: {error:s}",
"error_removing": "Error when removing {path:s}: {error:s}",
"error_changing_file_permissions": "Error when changing permissions for {path:s}: {error:s}",
"invalid_url": "Invalid url {url:s} (does this site exists ?)",
"invalid_url": "Invalid url {url:s} (does this site exists?)",
"download_ssl_error": "SSL error when connecting to {url:s}",
"download_timeout": "{url:s} took too long to answer, gave up.",
"download_unknown_error": "Error when downloading data from {url:s} : {error:s}",
"download_unknown_error": "Error when downloading data from {url:s}: {error:s}",
"download_bad_status_code": "{url:s} returned status code {code:s}",
"command_unknown": "Command '{command:s}' unknown ?"
"command_unknown": "Command '{command:s}' unknown?"
}

View file

@ -50,5 +50,6 @@
"download_unknown_error": "Error al descargar datos desde {url:s} : {error:s}",
"download_bad_status_code": "{url:s} devolvió el código de estado {code:s}",
"command_unknown": "Comando '{command:s}' desconocido ?",
"corrupted_yaml": "yaml corrupto leido desde {ressource:s} (motivo: {error:s})"
"corrupted_yaml": "yaml corrupto leido desde {ressource:s} (motivo: {error:s})",
"info": "Información:"
}

3
locales/eu.json Normal file
View file

@ -0,0 +1,3 @@
{
"argument_required": "'{argument}' argumentua beharrezkoa da"
}

View file

@ -1,23 +1,23 @@
{
"argument_required": "Largument « {argument} » est requis",
"authentication_profile_required": "Lauthentification au profil « {profile} » requise",
"argument_required": "Largument '{argument}' est requis",
"authentication_profile_required": "Lauthentification au profil '{profile}' est requise",
"authentication_required": "Authentification requise",
"authentication_required_long": "Lauthentification est requise pour exécuter cette action",
"colon": "{} : ",
"confirm": "Confirmez : {prompt}",
"deprecated_command": "« {prog} {command} » est déprécié et sera bientôt supprimé",
"deprecated_command_alias": "« {prog} {old} » est déprécié et sera bientôt supprimé, utilisez « {prog} {new} » à la place",
"deprecated_command": "'{prog} {command}' est déprécié et sera bientôt supprimé",
"deprecated_command_alias": "'{prog} {old}' est déprécié et sera bientôt supprimé, utilisez '{prog} {new}' à la place",
"error": "Erreur :",
"error_see_log": "Une erreur est survenue. Veuillez consulter les journaux pour plus de détails, ils sont situés en /var/log/yunohost/.",
"file_exists": "Le fichier existe déjà : « {path} »",
"file_not_exist": "Le fichier « {path} » nexiste pas",
"folder_exists": "Le dossier existe déjà : « {path} »",
"error_see_log": "Une erreur est survenue. Veuillez consulter les journaux pour plus de détails, ils sont situés dans /var/log/yunohost/.",
"file_exists": "Le fichier existe déjà : '{path}'",
"file_not_exist": "Le fichier '{path}' nexiste pas",
"folder_exists": "Le dossier existe déjà : '{path}'",
"folder_not_exist": "Le dossier nexiste pas",
"instance_already_running": "Une instance est déjà en cours dexécution",
"invalid_argument": "Argument « {argument} » incorrect : {error}",
"invalid_argument": "Argument '{argument}' incorrect : {error}",
"invalid_password": "Mot de passe incorrect",
"invalid_usage": "Utilisation erronée, utilisez --help pour accéder à laide",
"ldap_attribute_already_exists": "Lattribut « {attribute} » existe déjà avec comme valeur : {value}",
"ldap_attribute_already_exists": "Lattribut '{attribute}' existe déjà avec la valeur suivante : '{value}'",
"ldap_operation_error": "Une erreur est survenue lors de lopération LDAP",
"ldap_server_down": "Impossible datteindre le serveur LDAP",
"logged_in": "Connecté",
@ -32,23 +32,24 @@
"success": "Succès !",
"unable_authenticate": "Impossible de vous authentifier",
"unable_retrieve_session": "Impossible de récupérer la session",
"unknown_group": "Groupe « {group} » inconnu",
"unknown_user": "Utilisateur « {user} » inconnu",
"unknown_group": "Groupe '{group}' inconnu",
"unknown_user": "L'utilisateur « {user} » est inconnu",
"values_mismatch": "Les valeurs ne correspondent pas",
"warning": "Attention :",
"websocket_request_expected": "Requête WebSocket attendue",
"cannot_open_file": "Impossible douvrir le fichier {file:s} (cause : {error:s})",
"cannot_write_file": "Ne peut pas écrire le fichier {file:s} (cause : {error:s})",
"websocket_request_expected": "Une requête WebSocket est attendue",
"cannot_open_file": "Impossible douvrir le fichier {file:s} (raison : {error:s})",
"cannot_write_file": "Ne peut pas écrire le fichier {file:s} (raison : {error:s})",
"unknown_error_reading_file": "Erreur inconnue en essayant de lire le fichier {file:s}",
"corrupted_json": "Json corrompu lu depuis {ressource:s} (cause : {error:s})",
"error_writing_file": "Erreur en écrivant le fichier {file:s}:{error:s}",
"error_removing": "Erreur lors de la suppression {path:s}:{error:s}",
"error_changing_file_permissions": "Erreur lors de la modification des autorisations pour {path:s}:{error:s}",
"invalid_url": "Url invalide {url:s} (ce site existe-t-il ?)",
"corrupted_json": "Fichier JSON corrompu en lecture depuis {ressource:s} (raison : {error:s})",
"error_writing_file": "Erreur en écrivant le fichier {file:s} : {error:s}",
"error_removing": "Erreur lors de la suppression {path:s} : {error:s}",
"error_changing_file_permissions": "Erreur lors de la modification des autorisations pour {path:s} : {error:s}",
"invalid_url": "URL {url:s} invalide : ce site existe-t-il ?",
"download_ssl_error": "Erreur SSL lors de la connexion à {url:s}",
"download_timeout": "{url:s} a pris trop de temps pour répondre, abandon.",
"download_unknown_error": "Erreur lors du téléchargement des données à partir de {url:s}:{error:s}",
"download_bad_status_code": "{url:s} code de statut renvoyé {code:s}",
"command_unknown": "Commande « {command:s} » inconnue ?",
"corrupted_yaml": "YAML corrompu lu {ressource:s} depuis (cause : {error:s})"
"download_timeout": "{url:s} a pris trop de temps pour répondre : abandon.",
"download_unknown_error": "Erreur lors du téléchargement des données à partir de {url:s} : {error:s}",
"download_bad_status_code": "{url:s} renvoie le code d'état {code:s}",
"command_unknown": "Commande '{command:s}' inconnue ?",
"corrupted_yaml": "Fichier YAML corrompu en lecture depuis {ressource:s} (raison : {error:s})",
"info": "Info :"
}

1
locales/hu.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -50,5 +50,6 @@
"download_timeout": "{url:s} ci ha messo troppo a rispondere, abbandonato.",
"download_unknown_error": "Errore durante il download di dati da {url:s} : {error:s}",
"download_bad_status_code": "{url:s} ha restituito il codice di stato {code:s}",
"command_unknown": "Comando '{command:s}' sconosciuto ?"
"command_unknown": "Comando '{command:s}' sconosciuto ?",
"info": "Info:"
}

1
locales/nb_NO.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -50,5 +50,6 @@
"download_bad_status_code": "{url:s} tòrna lo còdi destat {code:s}",
"command_unknown": "Comanda {command:s} desconeguda?",
"corrupted_json": "Fichièr Json corromput legit de {ressource:s} (rason: {error:s})",
"corrupted_yaml": "Fichièr YAML corromput legit de {ressource:s} (rason: {error:s})"
"corrupted_yaml": "Fichièr YAML corromput legit de {ressource:s} (rason: {error:s})",
"info": "Info:"
}

1
locales/pl.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -1 +1,48 @@
{}
{
"argument_required": "Требуется'{argument}' аргумент",
"authentication_profile_required": "Для доступа к '{profile}' требуется аутентификация",
"authentication_required": "Требуется аутентификация",
"authentication_required_long": "Для этого действия требуется аутентификация",
"colon": "{}: ",
"confirm": "Подтвердить {prompt}",
"deprecated_command": "'{prog} {command}' устарела и будет удалена",
"deprecated_command_alias": "'{prog} {old}' устарела и будет удалена, вместо неё используйте '{prog} {new}'",
"error": "Ошибка:",
"error_see_log": "Произошла ошибка. Пожалуйста, смотри подробности в логах, находящихся /var/log/yunohost/.",
"file_exists": "Файл уже существует: '{path}'",
"file_not_exist": "Файл не существует: '{path}'",
"folder_exists": "Каталог уже существует: '{path}'",
"folder_not_exist": "Каталог не существует",
"invalid_argument": "Неправильный аргумент '{argument}': {error}",
"invalid_password": "Неправильный пароль",
"ldap_attribute_already_exists": "Атрибут '{attribute}' уже существует со значением '{value}'",
"logged_in": "Вы вошли",
"logged_out": "Вы вышли из системы",
"not_logged_in": "Вы не залогинились",
"operation_interrupted": "Действие прервано",
"password": "Пароль",
"pattern_not_match": "Не соответствует образцу",
"server_already_running": "Сервер уже запущен на этом порте",
"success": "Отлично!",
"unable_authenticate": "Аутентификация невозможна",
"unknown_group": "Неизвестная '{group}' группа",
"unknown_user": "Неизвестный '{user}' пользователь",
"values_mismatch": "Неверные значения",
"warning": "Внимание :",
"websocket_request_expected": "Ожидается запрос WebSocket",
"cannot_open_file": "Не могу открыть файл {file:s} (причина: {error:s})",
"cannot_write_file": "Не могу записать файл {file:s} (причина: {error:s})",
"unknown_error_reading_file": "Неизвестная ошибка при чтении файла {file:s}",
"corrupted_yaml": "Повреждённой yaml получен от {ressource:s} (причина: {error:s})",
"error_writing_file": "Ошибка при записи файла {file:s}: {error:s}",
"error_removing": "Ошибка при удалении {path:s}: {error:s}",
"invalid_url": "Неправильный url {url:s} (этот сайт существует ?)",
"download_ssl_error": "Ошибка SSL при соединении с {url:s}",
"download_timeout": "Превышено время ожидания ответа от {url:s}.",
"download_unknown_error": "Ошибка при загрузке данных с {url:s} : {error:s}",
"instance_already_running": "Процесс уже запущен",
"ldap_operation_error": "Ошибка в процессе работы LDAP",
"root_required": "Чтобы выполнить это действие, вы должны иметь права root",
"corrupted_json": "Повреждённый json получен от {ressource:s} (причина: {error:s})",
"command_unknown": "Команда '{command:s}' неизвестна ?"
}

1
locales/sv.json Normal file
View file

@ -0,0 +1 @@
{}

View file

@ -7,6 +7,7 @@ import random
import string
import crypt
import ldap
import ldap.sasl
import ldap.modlist as modlist
from moulinette.core import MoulinetteError
@ -40,8 +41,11 @@ class Authenticator(BaseAuthenticator):
self.uri = uri
self.basedn = base_dn
if user_rdn:
self.userdn = '%s,%s' % (user_rdn, base_dn)
self.con = None
self.userdn = user_rdn
if 'cn=external,cn=auth' in user_rdn:
self.authenticate(None)
else:
self.con = None
else:
# Initialize anonymous usage
self.userdn = ''
@ -77,7 +81,10 @@ class Authenticator(BaseAuthenticator):
try:
con = ldap.ldapobject.ReconnectLDAPObject(self.uri, retry_max=10, retry_delay=0.5)
if self.userdn:
con.simple_bind_s(self.userdn, password)
if 'cn=external,cn=auth' in self.userdn:
con.sasl_non_interactive_bind_s('EXTERNAL')
else:
con.simple_bind_s(self.userdn, password)
else:
con.simple_bind_s()
except ldap.INVALID_CREDENTIALS:

View file

@ -4,7 +4,6 @@ import os
import time
import json
import logging
import psutil
from importlib import import_module
@ -497,7 +496,10 @@ class MoulinetteLock(object):
"""
if self._locked:
os.unlink(self._lockfile)
if os.path.exists(self._lockfile):
os.unlink(self._lockfile)
else:
logger.warning("Uhoh, somehow the lock %s did not exist ..." % self._lockfile)
logger.debug('lock has been released')
self._locked = False
@ -522,6 +524,7 @@ class MoulinetteLock(object):
return lock_pids
def _is_son_of(self, lock_pids):
import psutil
if lock_pids == []:
return False

View file

@ -1,4 +1,8 @@
DATA_DIR = '/usr/share/moulinette'
LIB_DIR = '/usr/lib/moulinette'
LOCALES_DIR = '/usr/share/moulinette/locale'
CACHE_DIR = '/var/cache/moulinette'
"""Moulinette global configuration core."""
from os import environ
DATA_DIR = environ.get('MOULINETTE_DATA_DIR', '/usr/share/moulinette')
LIB_DIR = environ.get('MOULINETTE_LIB_DIR', '/usr/lib/moulinette')
LOCALES_DIR = environ.get('MOULINETTE_LOCALES_DIR', '/usr/share/moulinette/locale')
CACHE_DIR = environ.get('MOULINETTE_CACHE_DIR', '/var/cache/moulinette')

View file

@ -1,10 +1,13 @@
import os
import yaml
import toml
import errno
import shutil
import json
import grp
from pwd import getpwnam
from collections import OrderedDict
from moulinette import m18n
from moulinette.core import MoulinetteError
@ -31,8 +34,9 @@ def read_file(file_path):
file_content = f.read()
except IOError as e:
raise MoulinetteError('cannot_open_file', file=file_path, error=str(e))
except Exception as e:
raise MoulinetteError('error_reading_file', file=file_path, error=str(e))
except Exception:
raise MoulinetteError('unknown_error_reading_file',
file=file_path, error=str(e))
return file_content
@ -77,6 +81,62 @@ def read_yaml(file_path):
return loaded_yaml
def read_toml(file_path):
"""
Safely read a toml file
Keyword argument:
file_path -- Path to the toml file
"""
# Read file
file_content = read_file(file_path)
# Try to load toml to check if it's syntactically correct
try:
loaded_toml = toml.loads(file_content, _dict=OrderedDict)
except Exception as e:
raise MoulinetteError(errno.EINVAL,
m18n.g('corrupted_toml',
ressource=file_path, error=str(e)))
return loaded_toml
def read_ldif(file_path, filtred_entries=[]):
"""
Safely read a LDIF file and create struct in the same style than
what return the auth objet with the seach method
The main difference with the auth object is that this function return a 2-tuples
with the "dn" and the LDAP entry.
Keyword argument:
file_path -- Path to the ldif file
filtred_entries -- The entries to don't include in the result
"""
from ldif import LDIFRecordList
class LDIFPar(LDIFRecordList):
def handle(self, dn, entry):
for e in filtred_entries:
if e in entry:
entry.pop(e)
self.all_records.append((dn, entry))
# Open file and read content
try:
with open(file_path, "r") as f:
parser = LDIFPar(f)
parser.parse()
except IOError as e:
raise MoulinetteError('cannot_open_file', file=file_path, error=str(e))
except Exception as e:
raise MoulinetteError('unknown_error_reading_file',
file=file_path, error=str(e))
return parser.all_records
def write_to_file(file_path, data, file_mode="w"):
"""
Write a single string or a list of string to a text file.

View file

@ -65,7 +65,8 @@ def call_async_output(args, callback, **kwargs):
# if command does not write in the stdinfo pipe...)
stdinfo_f = os.open(stdinfo, os.O_RDONLY | os.O_NONBLOCK)
else:
kwargs.pop("stdinfo")
if "stdinfo" in kwargs:
kwargs.pop("stdinfo")
stdinfo = None
# Validate callback argument
@ -98,13 +99,15 @@ def call_async_output(args, callback, **kwargs):
# this way is not 100% perfect but should do it
stdout_consum.process_next_line()
stderr_consum.process_next_line()
stdinfo_consum.process_next_line()
if stdinfo:
stdinfo_consum.process_next_line()
time.sleep(.1)
stderr_reader.join()
# clear the queues
stdout_consum.process_current_queue()
stderr_consum.process_current_queue()
stdinfo_consum.process_current_queue()
if stdinfo:
stdinfo_consum.process_current_queue()
else:
while not stdout_reader.eof():
stdout_consum.process_current_queue()

View file

@ -1,106 +0,0 @@
import sys
import moulinette
sys.path.append("..")
###############################################################################
# Tweak moulinette init to have yunohost namespace #
###############################################################################
old_init = moulinette.core.Moulinette18n.__init__
def monkey_path_i18n_init(self, package, default_locale="en"):
old_init(self, package, default_locale)
self.load_namespace("moulinette")
moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init
###############################################################################
# Tweak translator to raise exceptions if string keys are not defined #
###############################################################################
old_translate = moulinette.core.Translator.translate
def new_translate(self, key, *args, **kwargs):
if key not in self._translations[self.default_locale].keys():
raise KeyError("Unable to retrieve key %s for default locale !" % key)
return old_translate(self, key, *args, **kwargs)
moulinette.core.Translator.translate = new_translate
def new_m18nn(self, key, *args, **kwargs):
return self._global.translate(key, *args, **kwargs)
moulinette.core.Moulinette18n.g = new_m18nn
###############################################################################
# Init the moulinette to have the cli loggers stuff #
###############################################################################
def pytest_cmdline_main(config):
"""Configure logging and initialize the moulinette"""
# Define loggers handlers
handlers = set(['tty'])
root_handlers = set(handlers)
# Define loggers level
level = 'INFO'
tty_level = 'SUCCESS'
# Custom logging configuration
logging = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'tty-debug': {
'format': '%(relativeCreated)-4d %(fmessage)s'
},
'precise': {
'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s'
},
},
'filters': {
'action': {
'()': 'moulinette.utils.log.ActionFilter',
},
},
'handlers': {
'tty': {
'level': tty_level,
'class': 'moulinette.interfaces.cli.TTYHandler',
'formatter': '',
},
},
'loggers': {
'moulinette': {
'level': level,
'handlers': [],
'propagate': True,
},
'moulinette.interface': {
'level': level,
'handlers': handlers,
'propagate': False,
},
},
'root': {
'level': level,
'handlers': root_handlers,
},
}
# Initialize moulinette
moulinette.init(logging_config=logging, _from_source=False)

View file

@ -1,295 +0,0 @@
# General python lib
import os
import pwd
import pytest
# Moulinette specific
from moulinette.core import MoulinetteError
from moulinette.utils.filesystem import (read_file, read_json,
write_to_file, append_to_file,
write_to_json,
rm,
chmod, chown)
# We define a dummy context with test folders and files
TEST_URL = "https://some.test.url/yolo.txt"
TMP_TEST_DIR = "/tmp/test_iohelpers"
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
TMP_TEST_JSON = "%s/barjson" % TMP_TEST_DIR
NON_ROOT_USER = "admin"
NON_ROOT_GROUP = "mail"
def setup_function(function):
os.system("rm -rf %s" % TMP_TEST_DIR)
os.system("mkdir %s" % TMP_TEST_DIR)
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
os.system("echo '{ \"foo\":\"bar\" }' > %s" % TMP_TEST_JSON)
os.system("chmod 700 %s" % TMP_TEST_FILE)
os.system("chmod 700 %s" % TMP_TEST_JSON)
def teardown_function(function):
os.seteuid(0)
os.system("rm -rf /tmp/test_iohelpers/")
# Helper to try stuff as non-root
def switch_to_non_root_user():
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
os.seteuid(nonrootuser)
###############################################################################
# Test file read #
###############################################################################
def test_read_file():
content = read_file(TMP_TEST_FILE)
assert content == "foo\nbar\n"
def test_read_file_badfile():
with pytest.raises(MoulinetteError):
read_file(TMP_TEST_FILE + "nope")
def test_read_file_badpermissions():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
read_file(TMP_TEST_FILE)
def test_read_json():
content = read_json(TMP_TEST_JSON)
assert "foo" in content.keys()
assert content["foo"] == "bar"
def test_read_json_badjson():
os.system("echo '{ not valid json lol }' > %s" % TMP_TEST_JSON)
with pytest.raises(MoulinetteError):
read_json(TMP_TEST_JSON)
###############################################################################
# Test file write #
###############################################################################
def test_write_to_existing_file():
assert os.path.exists(TMP_TEST_FILE)
write_to_file(TMP_TEST_FILE, "yolo\nswag")
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
def test_write_to_new_file():
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
write_to_file(new_file, "yolo\nswag")
assert os.path.exists(new_file)
assert read_file(new_file) == "yolo\nswag"
def test_write_to_existing_file_badpermissions():
assert os.path.exists(TMP_TEST_FILE)
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
write_to_file(TMP_TEST_FILE, "yolo\nswag")
def test_write_to_new_file_badpermissions():
switch_to_non_root_user()
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
with pytest.raises(MoulinetteError):
write_to_file(new_file, "yolo\nswag")
def test_write_to_folder():
with pytest.raises(AssertionError):
write_to_file(TMP_TEST_DIR, "yolo\nswag")
def test_write_inside_nonexistent_folder():
with pytest.raises(AssertionError):
write_to_file("/toto/test", "yolo\nswag")
def test_write_to_file_with_a_list():
assert os.path.exists(TMP_TEST_FILE)
write_to_file(TMP_TEST_FILE, ["yolo", "swag"])
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
def test_append_to_existing_file():
assert os.path.exists(TMP_TEST_FILE)
append_to_file(TMP_TEST_FILE, "yolo\nswag")
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
def test_append_to_new_file():
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
append_to_file(new_file, "yolo\nswag")
assert os.path.exists(new_file)
assert read_file(new_file) == "yolo\nswag"
def text_write_dict_to_json():
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
write_to_json(TMP_TEST_FILE, dummy_dict)
j = read_json(TMP_TEST_FILE)
assert "foo" in j.keys()
assert "bar" in j.keys()
assert j["foo"] == 42
assert j["bar"] == ["a", "b", "c"]
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
def text_write_list_to_json():
dummy_list = ["foo", "bar", "baz"]
write_to_json(TMP_TEST_FILE, dummy_list)
j = read_json(TMP_TEST_FILE)
assert j == ["foo", "bar", "baz"]
def test_write_to_json_badpermissions():
switch_to_non_root_user()
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
with pytest.raises(MoulinetteError):
write_to_json(TMP_TEST_FILE, dummy_dict)
def test_write_json_inside_nonexistent_folder():
with pytest.raises(AssertionError):
write_to_file("/toto/test.json", ["a", "b"])
###############################################################################
# Test file remove #
###############################################################################
def test_remove_file():
rm(TMP_TEST_FILE)
assert not os.path.exists(TMP_TEST_FILE)
def test_remove_file_badpermissions():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
rm(TMP_TEST_FILE)
def test_remove_directory():
rm(TMP_TEST_DIR, recursive=True)
assert not os.path.exists(TMP_TEST_DIR)
###############################################################################
# Test permission change #
###############################################################################
def get_permissions(file_path):
from stat import ST_MODE
return (pwd.getpwuid(os.stat(file_path).st_uid).pw_name,
pwd.getpwuid(os.stat(file_path).st_gid).pw_name,
oct(os.stat(file_path)[ST_MODE])[-3:])
# FIXME - should split the test of chown / chmod as independent tests
def set_permissions(f, owner, group, perms):
chown(f, owner, group)
chmod(f, perms)
def test_setpermissions_file():
# Check we're at the default permissions
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "700")
# Change the permissions
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
# Check the permissions got changed
assert get_permissions(TMP_TEST_FILE) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
# Change the permissions again
set_permissions(TMP_TEST_FILE, "root", "root", 0777)
# Check the permissions got changed
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "777")
def test_setpermissions_directory():
# Check we're at the default permissions
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "755")
# Change the permissions
set_permissions(TMP_TEST_DIR, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
# Check the permissions got changed
assert get_permissions(TMP_TEST_DIR) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
# Change the permissions again
set_permissions(TMP_TEST_DIR, "root", "root", 0777)
# Check the permissions got changed
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "777")
def test_setpermissions_permissiondenied():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
def test_setpermissions_badfile():
with pytest.raises(MoulinetteError):
set_permissions("/foo/bar/yolo", NON_ROOT_USER, NON_ROOT_GROUP, 0111)
def test_setpermissions_baduser():
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, "foo", NON_ROOT_GROUP, 0111)
def test_setpermissions_badgroup():
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, "foo", 0111)

View file

@ -1,90 +0,0 @@
# General python lib
import pytest
import requests
import requests_mock
# Moulinette specific
from moulinette.core import MoulinetteError
from moulinette.utils.network import download_text, download_json
# We define a dummy context with test folders and files
TEST_URL = "https://some.test.url/yolo.txt"
def setup_function(function):
pass
def teardown_function(function):
pass
###############################################################################
# Test download #
###############################################################################
def test_download():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='some text')
fetched_text = download_text(TEST_URL)
assert fetched_text == "some text"
def test_download_badurl():
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_404():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, status_code=404)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_sslerror():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, exc=requests.exceptions.SSLError)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_timeout():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, exc=requests.exceptions.ConnectTimeout)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_json():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='{ "foo":"bar" }')
fetched_json = download_json(TEST_URL)
assert "foo" in fetched_json.keys()
assert fetched_json["foo"] == "bar"
def test_download_json_badjson():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='{ not json lol }')
with pytest.raises(MoulinetteError):
download_json(TEST_URL)

View file

@ -1,66 +0,0 @@
# General python lib
import os
import pwd
import pytest
# Moulinette specific
from subprocess import CalledProcessError
from moulinette.utils.process import run_commands
# We define a dummy context with test folders and files
TMP_TEST_DIR = "/tmp/test_iohelpers"
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
NON_ROOT_USER = "admin"
NON_ROOT_GROUP = "mail"
def setup_function(function):
os.system("rm -rf %s" % TMP_TEST_DIR)
os.system("mkdir %s" % TMP_TEST_DIR)
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
os.system("chmod 700 %s" % TMP_TEST_FILE)
def teardown_function(function):
os.seteuid(0)
os.system("rm -rf /tmp/test_iohelpers/")
# Helper to try stuff as non-root
def switch_to_non_root_user():
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
os.seteuid(nonrootuser)
###############################################################################
# Test run shell commands #
###############################################################################
def test_run_shell_command_list():
commands = ["rm -f %s" % TMP_TEST_FILE]
assert os.path.exists(TMP_TEST_FILE)
run_commands(commands)
assert not os.path.exists(TMP_TEST_FILE)
def test_run_shell_badcommand():
commands = ["yolo swag"]
with pytest.raises(CalledProcessError):
run_commands(commands)
def test_run_shell_command_badpermissions():
commands = ["rm -f %s" % TMP_TEST_FILE]
switch_to_non_root_user()
with pytest.raises(CalledProcessError):
run_commands(commands)

6
pytest.ini Normal file
View file

@ -0,0 +1,6 @@
[pytest]
addopts = --cov=moulinette -s -v --no-cov-on-fail
norecursedirs = dist doc build .tox .eggs
testpaths = test/
env =
MOULINETTE_LOCALES_DIR = {PWD}/locales

2
setup.cfg Normal file
View file

@ -0,0 +1,2 @@
[pep8]
ignore = E501,E128,E731

View file

@ -1,12 +1,11 @@
#!/usr/bin/env python
import os
import sys
from distutils.core import setup
from moulinette.globals import LOCALES_DIR
# Extend installation
locale_files = []
@ -31,5 +30,19 @@ setup(name='Moulinette',
'moulinette.utils',
],
data_files=[(LOCALES_DIR, locale_files)],
tests_require=["pytest", "webtest"],
python_requires='==2.7.*',
install_requires=[
'argcomplete',
'psutil',
'pytz',
'pyyaml',
],
tests_require=[
'pytest',
'pytest-cov',
'pytest-env',
'pytest-mock',
'requests',
'requests-mock',
],
)

0
lib/test/__init__.py → test/__init__.py Executable file → Normal file
View file

128
test/conftest.py Normal file
View file

@ -0,0 +1,128 @@
"""Pytest fixtures for testing."""
import json
import os
import pytest
def patch_init(moulinette):
"""Configure moulinette to use the YunoHost namespace."""
old_init = moulinette.core.Moulinette18n.__init__
def monkey_path_i18n_init(self, package, default_locale='en'):
old_init(self, package, default_locale)
self.load_namespace('moulinette')
moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init
def patch_translate(moulinette):
"""Configure translator to raise errors when there are missing keys."""
old_translate = moulinette.core.Translator.translate
def new_translate(self, key, *args, **kwargs):
if key not in self._translations[self.default_locale].keys():
message = 'Unable to retrieve key %s for default locale!' % key
raise KeyError(message)
return old_translate(self, key, *args, **kwargs)
moulinette.core.Translator.translate = new_translate
def new_m18nn(self, key, *args, **kwargs):
return self._global.translate(key, *args, **kwargs)
moulinette.core.Moulinette18n.g = new_m18nn
def patch_logging(moulinette):
"""Configure logging to use the custom logger."""
handlers = set(['tty'])
root_handlers = set(handlers)
level = 'INFO'
tty_level = 'SUCCESS'
logging = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'tty-debug': {
'format': '%(relativeCreated)-4d %(fmessage)s'
},
'precise': {
'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s' # noqa
},
},
'filters': {
'action': {
'()': 'moulinette.utils.log.ActionFilter',
},
},
'handlers': {
'tty': {
'level': tty_level,
'class': 'moulinette.interfaces.cli.TTYHandler',
'formatter': '',
},
},
'loggers': {
'moulinette': {
'level': level,
'handlers': [],
'propagate': True,
},
'moulinette.interface': {
'level': level,
'handlers': handlers,
'propagate': False,
},
},
'root': {
'level': level,
'handlers': root_handlers,
},
}
moulinette.init(
logging_config=logging,
_from_source=False
)
@pytest.fixture(scope='session', autouse=True)
def moulinette():
import moulinette
patch_init(moulinette)
patch_translate(moulinette)
patch_logging(moulinette)
return moulinette
@pytest.fixture
def test_file(tmp_path):
test_text = 'foo\nbar\n'
test_file = tmp_path / 'test.txt'
test_file.write_bytes(test_text)
return test_file
@pytest.fixture
def test_json(tmp_path):
test_json = json.dumps({'foo': 'bar'})
test_file = tmp_path / 'test.json'
test_file.write_bytes(test_json)
return test_file
@pytest.fixture
def user():
return os.getlogin()
@pytest.fixture
def test_url():
return 'https://some.test.url/yolo.txt'

177
test/test_filesystem.py Normal file
View file

@ -0,0 +1,177 @@
import os
import pytest
from moulinette import m18n
from moulinette.core import MoulinetteError
from moulinette.utils.filesystem import (append_to_file, read_file, read_json,
rm, write_to_file, write_to_json)
def test_read_file(test_file):
content = read_file(str(test_file))
assert content == 'foo\nbar\n'
def test_read_file_missing_file():
bad_file = 'doesnt-exist'
with pytest.raises(MoulinetteError) as exception:
read_file(bad_file)
translation = m18n.g('file_not_exist')
expected_msg = translation.format(path=bad_file)
assert expected_msg in str(exception)
def test_read_file_cannot_read_ioerror(test_file, mocker):
error = 'foobar'
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
with pytest.raises(MoulinetteError) as exception:
read_file(str(test_file))
translation = m18n.g('cannot_open_file')
expected_msg = translation.format(file=str(test_file), error=error)
assert expected_msg in str(exception)
def test_read_json(test_json):
content = read_json(str(test_json))
assert 'foo' in content.keys()
assert content['foo'] == 'bar'
def test_read_json_cannot_read(test_json, mocker):
error = 'foobar'
with mocker.patch('json.loads', side_effect=ValueError(error)):
with pytest.raises(MoulinetteError) as exception:
read_json(str(test_json))
translation = m18n.g('corrupted_json')
expected_msg = translation.format(ressource=str(test_json), error=error)
assert expected_msg in str(exception)
def test_write_to_existing_file(test_file):
write_to_file(str(test_file), 'yolo\nswag')
assert read_file(str(test_file)) == 'yolo\nswag'
def test_write_to_new_file(tmp_path):
new_file = tmp_path / 'newfile.txt'
write_to_file(str(new_file), 'yolo\nswag')
assert os.path.exists(str(new_file))
assert read_file(str(new_file)) == 'yolo\nswag'
def test_write_to_existing_file_bad_perms(test_file, mocker):
error = 'foobar'
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
with pytest.raises(MoulinetteError) as exception:
write_to_file(str(test_file), 'yolo\nswag')
translation = m18n.g('cannot_write_file')
expected_msg = translation.format(file=str(test_file), error=error)
assert expected_msg in str(exception)
def test_write_cannot_write_folder(tmp_path):
with pytest.raises(AssertionError):
write_to_file(str(tmp_path), 'yolo\nswag')
def test_write_cannot_write_to_non_existant_folder():
with pytest.raises(AssertionError):
write_to_file('/toto/test', 'yolo\nswag')
def test_write_to_file_with_a_list(test_file):
write_to_file(str(test_file), ['yolo', 'swag'])
assert read_file(str(test_file)) == 'yolo\nswag'
def test_append_to_existing_file(test_file):
append_to_file(str(test_file), 'yolo\nswag')
assert read_file(str(test_file)) == 'foo\nbar\nyolo\nswag'
def test_append_to_new_file(tmp_path):
new_file = tmp_path / 'newfile.txt'
append_to_file(str(new_file), 'yolo\nswag')
assert os.path.exists(str(new_file))
assert read_file(str(new_file)) == 'yolo\nswag'
def text_write_dict_to_json(tmp_path):
new_file = tmp_path / 'newfile.json'
dummy_dict = {'foo': 42, 'bar': ['a', 'b', 'c']}
write_to_json(str(new_file), dummy_dict)
_json = read_json(str(new_file))
assert 'foo' in _json.keys()
assert 'bar' in _json.keys()
assert _json['foo'] == 42
assert _json['bar'] == ['a', 'b', 'c']
def text_write_list_to_json(tmp_path):
new_file = tmp_path / 'newfile.json'
dummy_list = ['foo', 'bar', 'baz']
write_to_json(str(new_file), dummy_list)
_json = read_json(str(new_file))
assert _json == ['foo', 'bar', 'baz']
def test_write_to_json_bad_perms(test_json, mocker):
error = 'foobar'
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
with pytest.raises(MoulinetteError) as exception:
write_to_json(str(test_json), {'a': 1})
translation = m18n.g('cannot_write_file')
expected_msg = translation.format(file=str(test_json), error=error)
assert expected_msg in str(exception)
def test_write_json_cannot_write_to_non_existant_folder():
with pytest.raises(AssertionError):
write_to_json('/toto/test.json', ['a', 'b'])
def test_remove_file(test_file):
assert os.path.exists(str(test_file))
rm(str(test_file))
assert not os.path.exists(str(test_file))
def test_remove_file_bad_perms(test_file, mocker):
error = 'foobar'
with mocker.patch('os.remove', side_effect=OSError(error)):
with pytest.raises(MoulinetteError) as exception:
rm(str(test_file))
translation = m18n.g('error_removing')
expected_msg = translation.format(path=str(test_file), error=error)
assert expected_msg in str(exception)
def test_remove_directory(tmp_path):
test_dir = tmp_path / "foo"
test_dir.mkdir()
assert os.path.exists(str(test_dir))
rm(str(test_dir), recursive=True)
assert not os.path.exists(str(test_dir))

56
test/test_network.py Normal file
View file

@ -0,0 +1,56 @@
import pytest
import requests
import requests_mock
from moulinette.core import MoulinetteError
from moulinette.utils.network import download_json, download_text
def test_download(test_url):
with requests_mock.Mocker() as mock:
mock.register_uri('GET', test_url, text='some text')
fetched_text = download_text(test_url)
assert fetched_text == 'some text'
def test_download_bad_url():
with pytest.raises(MoulinetteError):
download_text('Nowhere')
def test_download_404(test_url):
with requests_mock.Mocker() as mock:
mock.register_uri('GET', test_url, status_code=404)
with pytest.raises(MoulinetteError):
download_text(test_url)
def test_download_ssl_error(test_url):
with requests_mock.Mocker() as mock:
exception = requests.exceptions.SSLError
mock.register_uri('GET', test_url, exc=exception)
with pytest.raises(MoulinetteError):
download_text(test_url)
def test_download_timeout(test_url):
with requests_mock.Mocker() as mock:
exception = requests.exceptions.ConnectTimeout
mock.register_uri('GET', test_url, exc=exception)
with pytest.raises(MoulinetteError):
download_text(test_url)
def test_download_json(test_url):
with requests_mock.Mocker() as mock:
mock.register_uri('GET', test_url, text='{"foo":"bar"}')
fetched_json = download_json(test_url)
assert 'foo' in fetched_json.keys()
assert fetched_json['foo'] == 'bar'
def test_download_json_bad_json(test_url):
with requests_mock.Mocker() as mock:
mock.register_uri('GET', test_url, text='notjsonlol')
with pytest.raises(MoulinetteError):
download_json(test_url)

17
test/test_process.py Normal file
View file

@ -0,0 +1,17 @@
import os
from subprocess import CalledProcessError
import pytest
from moulinette.utils.process import run_commands
def test_run_shell_command_list(test_file):
assert os.path.exists(str(test_file))
run_commands(['rm -f %s' % str(test_file)])
assert not os.path.exists(str(test_file))
def test_run_shell_bad_cmd():
with pytest.raises(CalledProcessError):
run_commands(['yolo swag'])

View file

@ -1,100 +0,0 @@
# -*- coding: utf-8 -*-
from webtest import TestApp as WebTestApp
from bottle import Bottle
from moulinette.interfaces.api import filter_csrf
URLENCODED = 'application/x-www-form-urlencoded'
FORMDATA = 'multipart/form-data'
TEXT = 'text/plain'
TYPES = [URLENCODED, FORMDATA, TEXT]
SAFE_METHODS = ["HEAD", "GET", "PUT", "DELETE"]
app = Bottle(autojson=True)
app.install(filter_csrf)
@app.get('/')
def get_hello():
return "Hello World!\n"
@app.post('/')
def post_hello():
return "OK\n"
@app.put('/')
def put_hello():
return "OK\n"
@app.delete('/')
def delete_hello():
return "OK\n"
webtest = WebTestApp(app)
def test_get():
r = webtest.get("/")
assert r.status_code == 200
def test_csrf_post():
r = webtest.post("/", "test", expect_errors=True)
assert r.status_code == 403
def test_post_json():
r = webtest.post("/", "test",
headers=[("Content-Type", "application/json")])
assert r.status_code == 200
def test_csrf_post_text():
r = webtest.post("/", "test",
headers=[("Content-Type", "text/plain")],
expect_errors=True)
assert r.status_code == 403
def test_csrf_post_urlencoded():
r = webtest.post("/", "test",
headers=[("Content-Type",
"application/x-www-form-urlencoded")],
expect_errors=True)
assert r.status_code == 403
def test_csrf_post_form():
r = webtest.post("/", "test",
headers=[("Content-Type", "multipart/form-data")],
expect_errors=True)
assert r.status_code == 403
def test_ok_post_text():
r = webtest.post("/", "test",
headers=[("Content-Type", "text/plain"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200
def test_ok_post_urlencoded():
r = webtest.post("/", "test",
headers=[("Content-Type",
"application/x-www-form-urlencoded"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200
def test_ok_post_form():
r = webtest.post("/", "test",
headers=[("Content-Type", "multipart/form-data"),
("X-Requested-With", "XMLHttpRequest")])
assert r.status_code == 200

17
tox.ini Normal file
View file

@ -0,0 +1,17 @@
[tox]
envlist = py27
skipdist = True
isolated_build = True
[testenv]
usedevelop = True
passenv = *
deps =
pytest >= 4.6.3, < 5.0
pytest-cov >= 2.7.1, < 3.0
pytest-mock >= 1.10.4, < 2.0
pytest-env >= 0.6.2, < 1.0
requests >= 2.22.0, < 3.0
requests-mock >= 1.6.0, < 2.0
commands =
pytest {posargs}