mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
Migrate utility tests to Pytest top level folder
This commit is contained in:
parent
5744dbd6fa
commit
ee1c67811b
9 changed files with 378 additions and 557 deletions
|
@ -1,106 +0,0 @@
|
|||
import sys
|
||||
import moulinette
|
||||
|
||||
sys.path.append("..")
|
||||
|
||||
###############################################################################
|
||||
# Tweak moulinette init to have yunohost namespace #
|
||||
###############################################################################
|
||||
|
||||
|
||||
old_init = moulinette.core.Moulinette18n.__init__
|
||||
|
||||
|
||||
def monkey_path_i18n_init(self, package, default_locale="en"):
|
||||
old_init(self, package, default_locale)
|
||||
self.load_namespace("moulinette")
|
||||
|
||||
|
||||
moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Tweak translator to raise exceptions if string keys are not defined #
|
||||
###############################################################################
|
||||
|
||||
|
||||
old_translate = moulinette.core.Translator.translate
|
||||
|
||||
|
||||
def new_translate(self, key, *args, **kwargs):
|
||||
|
||||
if key not in self._translations[self.default_locale].keys():
|
||||
raise KeyError("Unable to retrieve key %s for default locale !" % key)
|
||||
|
||||
return old_translate(self, key, *args, **kwargs)
|
||||
|
||||
|
||||
moulinette.core.Translator.translate = new_translate
|
||||
|
||||
|
||||
def new_m18nn(self, key, *args, **kwargs):
|
||||
return self._global.translate(key, *args, **kwargs)
|
||||
|
||||
|
||||
moulinette.core.Moulinette18n.g = new_m18nn
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Init the moulinette to have the cli loggers stuff #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def pytest_cmdline_main(config):
|
||||
"""Configure logging and initialize the moulinette"""
|
||||
# Define loggers handlers
|
||||
handlers = set(['tty'])
|
||||
root_handlers = set(handlers)
|
||||
|
||||
# Define loggers level
|
||||
level = 'INFO'
|
||||
tty_level = 'SUCCESS'
|
||||
|
||||
# Custom logging configuration
|
||||
logging = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': True,
|
||||
'formatters': {
|
||||
'tty-debug': {
|
||||
'format': '%(relativeCreated)-4d %(fmessage)s'
|
||||
},
|
||||
'precise': {
|
||||
'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s'
|
||||
},
|
||||
},
|
||||
'filters': {
|
||||
'action': {
|
||||
'()': 'moulinette.utils.log.ActionFilter',
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'tty': {
|
||||
'level': tty_level,
|
||||
'class': 'moulinette.interfaces.cli.TTYHandler',
|
||||
'formatter': '',
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'moulinette': {
|
||||
'level': level,
|
||||
'handlers': [],
|
||||
'propagate': True,
|
||||
},
|
||||
'moulinette.interface': {
|
||||
'level': level,
|
||||
'handlers': handlers,
|
||||
'propagate': False,
|
||||
},
|
||||
},
|
||||
'root': {
|
||||
'level': level,
|
||||
'handlers': root_handlers,
|
||||
},
|
||||
}
|
||||
|
||||
# Initialize moulinette
|
||||
moulinette.init(logging_config=logging, _from_source=False)
|
|
@ -1,295 +0,0 @@
|
|||
|
||||
# General python lib
|
||||
import os
|
||||
import pwd
|
||||
import pytest
|
||||
|
||||
# Moulinette specific
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.filesystem import (read_file, read_json,
|
||||
write_to_file, append_to_file,
|
||||
write_to_json,
|
||||
rm,
|
||||
chmod, chown)
|
||||
|
||||
# We define a dummy context with test folders and files
|
||||
|
||||
TEST_URL = "https://some.test.url/yolo.txt"
|
||||
TMP_TEST_DIR = "/tmp/test_iohelpers"
|
||||
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
|
||||
TMP_TEST_JSON = "%s/barjson" % TMP_TEST_DIR
|
||||
NON_ROOT_USER = "admin"
|
||||
NON_ROOT_GROUP = "mail"
|
||||
|
||||
|
||||
def setup_function(function):
|
||||
|
||||
os.system("rm -rf %s" % TMP_TEST_DIR)
|
||||
os.system("mkdir %s" % TMP_TEST_DIR)
|
||||
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
|
||||
os.system("echo '{ \"foo\":\"bar\" }' > %s" % TMP_TEST_JSON)
|
||||
os.system("chmod 700 %s" % TMP_TEST_FILE)
|
||||
os.system("chmod 700 %s" % TMP_TEST_JSON)
|
||||
|
||||
|
||||
def teardown_function(function):
|
||||
|
||||
os.seteuid(0)
|
||||
os.system("rm -rf /tmp/test_iohelpers/")
|
||||
|
||||
|
||||
# Helper to try stuff as non-root
|
||||
def switch_to_non_root_user():
|
||||
|
||||
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
|
||||
os.seteuid(nonrootuser)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Test file read #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def test_read_file():
|
||||
|
||||
content = read_file(TMP_TEST_FILE)
|
||||
assert content == "foo\nbar\n"
|
||||
|
||||
|
||||
def test_read_file_badfile():
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
read_file(TMP_TEST_FILE + "nope")
|
||||
|
||||
|
||||
def test_read_file_badpermissions():
|
||||
|
||||
switch_to_non_root_user()
|
||||
with pytest.raises(MoulinetteError):
|
||||
read_file(TMP_TEST_FILE)
|
||||
|
||||
|
||||
def test_read_json():
|
||||
|
||||
content = read_json(TMP_TEST_JSON)
|
||||
assert "foo" in content.keys()
|
||||
assert content["foo"] == "bar"
|
||||
|
||||
|
||||
def test_read_json_badjson():
|
||||
|
||||
os.system("echo '{ not valid json lol }' > %s" % TMP_TEST_JSON)
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
read_json(TMP_TEST_JSON)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Test file write #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def test_write_to_existing_file():
|
||||
|
||||
assert os.path.exists(TMP_TEST_FILE)
|
||||
write_to_file(TMP_TEST_FILE, "yolo\nswag")
|
||||
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
|
||||
|
||||
|
||||
def test_write_to_new_file():
|
||||
|
||||
new_file = "%s/barfile" % TMP_TEST_DIR
|
||||
assert not os.path.exists(new_file)
|
||||
write_to_file(new_file, "yolo\nswag")
|
||||
assert os.path.exists(new_file)
|
||||
assert read_file(new_file) == "yolo\nswag"
|
||||
|
||||
|
||||
def test_write_to_existing_file_badpermissions():
|
||||
|
||||
assert os.path.exists(TMP_TEST_FILE)
|
||||
switch_to_non_root_user()
|
||||
with pytest.raises(MoulinetteError):
|
||||
write_to_file(TMP_TEST_FILE, "yolo\nswag")
|
||||
|
||||
|
||||
def test_write_to_new_file_badpermissions():
|
||||
|
||||
switch_to_non_root_user()
|
||||
new_file = "%s/barfile" % TMP_TEST_DIR
|
||||
assert not os.path.exists(new_file)
|
||||
with pytest.raises(MoulinetteError):
|
||||
write_to_file(new_file, "yolo\nswag")
|
||||
|
||||
|
||||
def test_write_to_folder():
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file(TMP_TEST_DIR, "yolo\nswag")
|
||||
|
||||
|
||||
def test_write_inside_nonexistent_folder():
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file("/toto/test", "yolo\nswag")
|
||||
|
||||
|
||||
def test_write_to_file_with_a_list():
|
||||
|
||||
assert os.path.exists(TMP_TEST_FILE)
|
||||
write_to_file(TMP_TEST_FILE, ["yolo", "swag"])
|
||||
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
|
||||
|
||||
|
||||
def test_append_to_existing_file():
|
||||
|
||||
assert os.path.exists(TMP_TEST_FILE)
|
||||
append_to_file(TMP_TEST_FILE, "yolo\nswag")
|
||||
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
|
||||
|
||||
|
||||
def test_append_to_new_file():
|
||||
|
||||
new_file = "%s/barfile" % TMP_TEST_DIR
|
||||
assert not os.path.exists(new_file)
|
||||
append_to_file(new_file, "yolo\nswag")
|
||||
assert os.path.exists(new_file)
|
||||
assert read_file(new_file) == "yolo\nswag"
|
||||
|
||||
|
||||
def text_write_dict_to_json():
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
write_to_json(TMP_TEST_FILE, dummy_dict)
|
||||
j = read_json(TMP_TEST_FILE)
|
||||
assert "foo" in j.keys()
|
||||
assert "bar" in j.keys()
|
||||
assert j["foo"] == 42
|
||||
assert j["bar"] == ["a", "b", "c"]
|
||||
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
|
||||
|
||||
|
||||
def text_write_list_to_json():
|
||||
|
||||
dummy_list = ["foo", "bar", "baz"]
|
||||
write_to_json(TMP_TEST_FILE, dummy_list)
|
||||
j = read_json(TMP_TEST_FILE)
|
||||
assert j == ["foo", "bar", "baz"]
|
||||
|
||||
|
||||
def test_write_to_json_badpermissions():
|
||||
|
||||
switch_to_non_root_user()
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
with pytest.raises(MoulinetteError):
|
||||
write_to_json(TMP_TEST_FILE, dummy_dict)
|
||||
|
||||
|
||||
def test_write_json_inside_nonexistent_folder():
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file("/toto/test.json", ["a", "b"])
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Test file remove #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def test_remove_file():
|
||||
|
||||
rm(TMP_TEST_FILE)
|
||||
assert not os.path.exists(TMP_TEST_FILE)
|
||||
|
||||
|
||||
def test_remove_file_badpermissions():
|
||||
|
||||
switch_to_non_root_user()
|
||||
with pytest.raises(MoulinetteError):
|
||||
rm(TMP_TEST_FILE)
|
||||
|
||||
|
||||
def test_remove_directory():
|
||||
|
||||
rm(TMP_TEST_DIR, recursive=True)
|
||||
assert not os.path.exists(TMP_TEST_DIR)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Test permission change #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def get_permissions(file_path):
|
||||
from stat import ST_MODE
|
||||
return (pwd.getpwuid(os.stat(file_path).st_uid).pw_name,
|
||||
pwd.getpwuid(os.stat(file_path).st_gid).pw_name,
|
||||
oct(os.stat(file_path)[ST_MODE])[-3:])
|
||||
|
||||
|
||||
# FIXME - should split the test of chown / chmod as independent tests
|
||||
def set_permissions(f, owner, group, perms):
|
||||
chown(f, owner, group)
|
||||
chmod(f, perms)
|
||||
|
||||
|
||||
def test_setpermissions_file():
|
||||
|
||||
# Check we're at the default permissions
|
||||
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "700")
|
||||
|
||||
# Change the permissions
|
||||
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
|
||||
|
||||
# Check the permissions got changed
|
||||
assert get_permissions(TMP_TEST_FILE) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
|
||||
|
||||
# Change the permissions again
|
||||
set_permissions(TMP_TEST_FILE, "root", "root", 0777)
|
||||
|
||||
# Check the permissions got changed
|
||||
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "777")
|
||||
|
||||
|
||||
def test_setpermissions_directory():
|
||||
|
||||
# Check we're at the default permissions
|
||||
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "755")
|
||||
|
||||
# Change the permissions
|
||||
set_permissions(TMP_TEST_DIR, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
|
||||
|
||||
# Check the permissions got changed
|
||||
assert get_permissions(TMP_TEST_DIR) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
|
||||
|
||||
# Change the permissions again
|
||||
set_permissions(TMP_TEST_DIR, "root", "root", 0777)
|
||||
|
||||
# Check the permissions got changed
|
||||
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "777")
|
||||
|
||||
|
||||
def test_setpermissions_permissiondenied():
|
||||
|
||||
switch_to_non_root_user()
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
|
||||
|
||||
|
||||
def test_setpermissions_badfile():
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
set_permissions("/foo/bar/yolo", NON_ROOT_USER, NON_ROOT_GROUP, 0111)
|
||||
|
||||
|
||||
def test_setpermissions_baduser():
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
set_permissions(TMP_TEST_FILE, "foo", NON_ROOT_GROUP, 0111)
|
||||
|
||||
|
||||
def test_setpermissions_badgroup():
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, "foo", 0111)
|
|
@ -1,90 +0,0 @@
|
|||
|
||||
# General python lib
|
||||
import pytest
|
||||
import requests
|
||||
import requests_mock
|
||||
|
||||
# Moulinette specific
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.network import download_text, download_json
|
||||
|
||||
# We define a dummy context with test folders and files
|
||||
|
||||
TEST_URL = "https://some.test.url/yolo.txt"
|
||||
|
||||
|
||||
def setup_function(function):
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def teardown_function(function):
|
||||
|
||||
pass
|
||||
|
||||
###############################################################################
|
||||
# Test download #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def test_download():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, text='some text')
|
||||
|
||||
fetched_text = download_text(TEST_URL)
|
||||
|
||||
assert fetched_text == "some text"
|
||||
|
||||
|
||||
def test_download_badurl():
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(TEST_URL)
|
||||
|
||||
|
||||
def test_download_404():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, status_code=404)
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(TEST_URL)
|
||||
|
||||
|
||||
def test_download_sslerror():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, exc=requests.exceptions.SSLError)
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(TEST_URL)
|
||||
|
||||
|
||||
def test_download_timeout():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, exc=requests.exceptions.ConnectTimeout)
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(TEST_URL)
|
||||
|
||||
|
||||
def test_download_json():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, text='{ "foo":"bar" }')
|
||||
|
||||
fetched_json = download_json(TEST_URL)
|
||||
|
||||
assert "foo" in fetched_json.keys()
|
||||
assert fetched_json["foo"] == "bar"
|
||||
|
||||
|
||||
def test_download_json_badjson():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
m.register_uri("GET", TEST_URL, text='{ not json lol }')
|
||||
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_json(TEST_URL)
|
|
@ -1,66 +0,0 @@
|
|||
# General python lib
|
||||
import os
|
||||
import pwd
|
||||
import pytest
|
||||
|
||||
# Moulinette specific
|
||||
from subprocess import CalledProcessError
|
||||
from moulinette.utils.process import run_commands
|
||||
|
||||
# We define a dummy context with test folders and files
|
||||
|
||||
TMP_TEST_DIR = "/tmp/test_iohelpers"
|
||||
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
|
||||
NON_ROOT_USER = "admin"
|
||||
NON_ROOT_GROUP = "mail"
|
||||
|
||||
|
||||
def setup_function(function):
|
||||
|
||||
os.system("rm -rf %s" % TMP_TEST_DIR)
|
||||
os.system("mkdir %s" % TMP_TEST_DIR)
|
||||
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
|
||||
os.system("chmod 700 %s" % TMP_TEST_FILE)
|
||||
|
||||
|
||||
def teardown_function(function):
|
||||
|
||||
os.seteuid(0)
|
||||
os.system("rm -rf /tmp/test_iohelpers/")
|
||||
|
||||
|
||||
# Helper to try stuff as non-root
|
||||
def switch_to_non_root_user():
|
||||
|
||||
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
|
||||
os.seteuid(nonrootuser)
|
||||
|
||||
###############################################################################
|
||||
# Test run shell commands #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def test_run_shell_command_list():
|
||||
|
||||
commands = ["rm -f %s" % TMP_TEST_FILE]
|
||||
|
||||
assert os.path.exists(TMP_TEST_FILE)
|
||||
run_commands(commands)
|
||||
assert not os.path.exists(TMP_TEST_FILE)
|
||||
|
||||
|
||||
def test_run_shell_badcommand():
|
||||
|
||||
commands = ["yolo swag"]
|
||||
|
||||
with pytest.raises(CalledProcessError):
|
||||
run_commands(commands)
|
||||
|
||||
|
||||
def test_run_shell_command_badpermissions():
|
||||
|
||||
commands = ["rm -f %s" % TMP_TEST_FILE]
|
||||
|
||||
switch_to_non_root_user()
|
||||
with pytest.raises(CalledProcessError):
|
||||
run_commands(commands)
|
0
test/__init__.py
Normal file
0
test/__init__.py
Normal file
128
test/conftest.py
Normal file
128
test/conftest.py
Normal file
|
@ -0,0 +1,128 @@
|
|||
"""Pytest fixtures for testing."""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def patch_init(moulinette):
|
||||
"""Configure moulinette to use the YunoHost namespace."""
|
||||
old_init = moulinette.core.Moulinette18n.__init__
|
||||
|
||||
def monkey_path_i18n_init(self, package, default_locale='en'):
|
||||
old_init(self, package, default_locale)
|
||||
self.load_namespace('moulinette')
|
||||
|
||||
moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init
|
||||
|
||||
|
||||
def patch_translate(moulinette):
|
||||
"""Configure translator to raise errors when there are missing keys."""
|
||||
old_translate = moulinette.core.Translator.translate
|
||||
|
||||
def new_translate(self, key, *args, **kwargs):
|
||||
if key not in self._translations[self.default_locale].keys():
|
||||
message = 'Unable to retrieve key %s for default locale!' % key
|
||||
raise KeyError(message)
|
||||
|
||||
return old_translate(self, key, *args, **kwargs)
|
||||
|
||||
moulinette.core.Translator.translate = new_translate
|
||||
|
||||
def new_m18nn(self, key, *args, **kwargs):
|
||||
return self._global.translate(key, *args, **kwargs)
|
||||
|
||||
moulinette.core.Moulinette18n.g = new_m18nn
|
||||
|
||||
|
||||
def patch_logging(moulinette):
|
||||
"""Configure logging to use the custom logger."""
|
||||
handlers = set(['tty'])
|
||||
root_handlers = set(handlers)
|
||||
|
||||
level = 'INFO'
|
||||
tty_level = 'SUCCESS'
|
||||
|
||||
logging = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': True,
|
||||
'formatters': {
|
||||
'tty-debug': {
|
||||
'format': '%(relativeCreated)-4d %(fmessage)s'
|
||||
},
|
||||
'precise': {
|
||||
'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s' # noqa
|
||||
},
|
||||
},
|
||||
'filters': {
|
||||
'action': {
|
||||
'()': 'moulinette.utils.log.ActionFilter',
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'tty': {
|
||||
'level': tty_level,
|
||||
'class': 'moulinette.interfaces.cli.TTYHandler',
|
||||
'formatter': '',
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'moulinette': {
|
||||
'level': level,
|
||||
'handlers': [],
|
||||
'propagate': True,
|
||||
},
|
||||
'moulinette.interface': {
|
||||
'level': level,
|
||||
'handlers': handlers,
|
||||
'propagate': False,
|
||||
},
|
||||
},
|
||||
'root': {
|
||||
'level': level,
|
||||
'handlers': root_handlers,
|
||||
},
|
||||
}
|
||||
|
||||
moulinette.init(
|
||||
logging_config=logging,
|
||||
_from_source=False
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def moulinette(scope='session', autouse=True):
|
||||
import moulinette
|
||||
|
||||
patch_init(moulinette)
|
||||
patch_translate(moulinette)
|
||||
patch_logging(moulinette)
|
||||
|
||||
return moulinette
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_file(tmp_path):
|
||||
test_text = 'foo\nbar\n'
|
||||
test_file = tmp_path / 'test.txt'
|
||||
test_file.write_bytes(test_text)
|
||||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_json(tmp_path):
|
||||
test_json = json.dumps({'foo': 'bar'})
|
||||
test_file = tmp_path / 'test.json'
|
||||
test_file.write_bytes(test_json)
|
||||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user():
|
||||
return os.getlogin()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_url():
|
||||
return 'https://some.test.url/yolo.txt'
|
177
test/test_filesystem.py
Normal file
177
test/test_filesystem.py
Normal file
|
@ -0,0 +1,177 @@
|
|||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from moulinette import m18n
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.filesystem import (append_to_file, read_file, read_json,
|
||||
rm, write_to_file, write_to_json)
|
||||
|
||||
|
||||
def test_read_file(test_file):
|
||||
content = read_file(str(test_file))
|
||||
assert content == 'foo\nbar\n'
|
||||
|
||||
|
||||
def test_read_file_missing_file():
|
||||
bad_file = 'doesnt-exist'
|
||||
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_file(bad_file)
|
||||
|
||||
translation = m18n.g('file_not_exist')
|
||||
expected_msg = translation.format(path=bad_file)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_file_cannot_read_ioerror(test_file, mocker):
|
||||
error = 'foobar'
|
||||
|
||||
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_file(str(test_file))
|
||||
|
||||
translation = m18n.g('cannot_open_file')
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_json(test_json):
|
||||
content = read_json(str(test_json))
|
||||
assert 'foo' in content.keys()
|
||||
assert content['foo'] == 'bar'
|
||||
|
||||
|
||||
def test_read_json_cannot_read(test_json, mocker):
|
||||
error = 'foobar'
|
||||
|
||||
with mocker.patch('json.loads', side_effect=ValueError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_json(str(test_json))
|
||||
|
||||
translation = m18n.g('corrupted_json')
|
||||
expected_msg = translation.format(ressource=str(test_json), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_to_existing_file(test_file):
|
||||
write_to_file(str(test_file), 'yolo\nswag')
|
||||
assert read_file(str(test_file)) == 'yolo\nswag'
|
||||
|
||||
|
||||
def test_write_to_new_file(tmp_path):
|
||||
new_file = tmp_path / 'newfile.txt'
|
||||
|
||||
write_to_file(str(new_file), 'yolo\nswag')
|
||||
|
||||
assert os.path.exists(str(new_file))
|
||||
assert read_file(str(new_file)) == 'yolo\nswag'
|
||||
|
||||
|
||||
def test_write_to_existing_file_bad_perms(test_file, mocker):
|
||||
error = 'foobar'
|
||||
|
||||
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_file(str(test_file), 'yolo\nswag')
|
||||
|
||||
translation = m18n.g('cannot_write_file')
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_cannot_write_folder(tmp_path):
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file(str(tmp_path), 'yolo\nswag')
|
||||
|
||||
|
||||
def test_write_cannot_write_to_non_existant_folder():
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file('/toto/test', 'yolo\nswag')
|
||||
|
||||
|
||||
def test_write_to_file_with_a_list(test_file):
|
||||
write_to_file(str(test_file), ['yolo', 'swag'])
|
||||
assert read_file(str(test_file)) == 'yolo\nswag'
|
||||
|
||||
|
||||
def test_append_to_existing_file(test_file):
|
||||
append_to_file(str(test_file), 'yolo\nswag')
|
||||
assert read_file(str(test_file)) == 'foo\nbar\nyolo\nswag'
|
||||
|
||||
|
||||
def test_append_to_new_file(tmp_path):
|
||||
new_file = tmp_path / 'newfile.txt'
|
||||
|
||||
append_to_file(str(new_file), 'yolo\nswag')
|
||||
|
||||
assert os.path.exists(str(new_file))
|
||||
assert read_file(str(new_file)) == 'yolo\nswag'
|
||||
|
||||
|
||||
def text_write_dict_to_json(tmp_path):
|
||||
new_file = tmp_path / 'newfile.json'
|
||||
|
||||
dummy_dict = {'foo': 42, 'bar': ['a', 'b', 'c']}
|
||||
write_to_json(str(new_file), dummy_dict)
|
||||
_json = read_json(str(new_file))
|
||||
|
||||
assert 'foo' in _json.keys()
|
||||
assert 'bar' in _json.keys()
|
||||
|
||||
assert _json['foo'] == 42
|
||||
assert _json['bar'] == ['a', 'b', 'c']
|
||||
|
||||
|
||||
def text_write_list_to_json(tmp_path):
|
||||
new_file = tmp_path / 'newfile.json'
|
||||
|
||||
dummy_list = ['foo', 'bar', 'baz']
|
||||
write_to_json(str(new_file), dummy_list)
|
||||
|
||||
_json = read_json(str(new_file))
|
||||
assert _json == ['foo', 'bar', 'baz']
|
||||
|
||||
|
||||
def test_write_to_json_bad_perms(test_json, mocker):
|
||||
error = 'foobar'
|
||||
|
||||
with mocker.patch('__builtin__.open', side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_json(str(test_json), {'a': 1})
|
||||
|
||||
translation = m18n.g('cannot_write_file')
|
||||
expected_msg = translation.format(file=str(test_json), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_json_cannot_write_to_non_existant_folder():
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_json('/toto/test.json', ['a', 'b'])
|
||||
|
||||
|
||||
def test_remove_file(test_file):
|
||||
assert os.path.exists(str(test_file))
|
||||
rm(str(test_file))
|
||||
assert not os.path.exists(str(test_file))
|
||||
|
||||
|
||||
def test_remove_file_bad_perms(test_file, mocker):
|
||||
error = 'foobar'
|
||||
|
||||
with mocker.patch('os.remove', side_effect=OSError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
rm(str(test_file))
|
||||
|
||||
translation = m18n.g('error_removing')
|
||||
expected_msg = translation.format(path=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_remove_directory(tmp_path):
|
||||
test_dir = tmp_path / "foo"
|
||||
test_dir.mkdir()
|
||||
|
||||
assert os.path.exists(str(test_dir))
|
||||
rm(str(test_dir), recursive=True)
|
||||
assert not os.path.exists(str(test_dir))
|
56
test/test_network.py
Normal file
56
test/test_network.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
import pytest
|
||||
import requests
|
||||
import requests_mock
|
||||
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.network import download_json, download_text
|
||||
|
||||
|
||||
def test_download(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
mock.register_uri('GET', test_url, text='some text')
|
||||
fetched_text = download_text(test_url)
|
||||
assert fetched_text == 'some text'
|
||||
|
||||
|
||||
def test_download_bad_url():
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text('Nowhere')
|
||||
|
||||
|
||||
def test_download_404(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
mock.register_uri('GET', test_url, status_code=404)
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(test_url)
|
||||
|
||||
|
||||
def test_download_ssl_error(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
exception = requests.exceptions.SSLError
|
||||
mock.register_uri('GET', test_url, exc=exception)
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(test_url)
|
||||
|
||||
|
||||
def test_download_timeout(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
exception = requests.exceptions.ConnectTimeout
|
||||
mock.register_uri('GET', test_url, exc=exception)
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(test_url)
|
||||
|
||||
|
||||
def test_download_json(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
mock.register_uri('GET', test_url, text='{"foo":"bar"}')
|
||||
fetched_json = download_json(test_url)
|
||||
assert 'foo' in fetched_json.keys()
|
||||
assert fetched_json['foo'] == 'bar'
|
||||
|
||||
|
||||
def test_download_json_bad_json(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
mock.register_uri('GET', test_url, text='notjsonlol')
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_json(test_url)
|
17
test/test_process.py
Normal file
17
test/test_process.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
import os
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
import pytest
|
||||
|
||||
from moulinette.utils.process import run_commands
|
||||
|
||||
|
||||
def test_run_shell_command_list(test_file):
|
||||
assert os.path.exists(str(test_file))
|
||||
run_commands(['rm -f %s' % str(test_file)])
|
||||
assert not os.path.exists(str(test_file))
|
||||
|
||||
|
||||
def test_run_shell_bad_cmd():
|
||||
with pytest.raises(CalledProcessError):
|
||||
run_commands(['yolo swag'])
|
Loading…
Add table
Reference in a new issue