mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
Add more tests
This commit is contained in:
parent
54b8cab133
commit
6edc021f83
7 changed files with 504 additions and 25 deletions
|
@ -37,7 +37,7 @@ def read_file(file_path):
|
|||
file_content = f.read()
|
||||
except IOError as e:
|
||||
raise MoulinetteError("cannot_open_file", file=file_path, error=str(e))
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
raise MoulinetteError(
|
||||
"unknown_error_reading_file", file=file_path, error=str(e)
|
||||
)
|
||||
|
@ -100,9 +100,7 @@ def read_toml(file_path):
|
|||
try:
|
||||
loaded_toml = toml.loads(file_content, _dict=OrderedDict)
|
||||
except Exception as e:
|
||||
raise MoulinetteError(
|
||||
errno.EINVAL, m18n.g("corrupted_toml", ressource=file_path, error=str(e))
|
||||
)
|
||||
raise MoulinetteError("corrupted_toml", ressource=file_path, error=str(e))
|
||||
|
||||
return loaded_toml
|
||||
|
||||
|
@ -255,7 +253,7 @@ def write_to_yaml(file_path, data):
|
|||
raise MoulinetteError("error_writing_file", file=file_path, error=str(e))
|
||||
|
||||
|
||||
def mkdir(path, mode=0o777, parents=False, uid=None, gid=None, force=False):
|
||||
def mkdir(path, mode=0o0777, parents=False, uid=None, gid=None, force=False):
|
||||
"""Create a directory with optional features
|
||||
|
||||
Create a directory and optionaly set its permissions to mode and its
|
||||
|
@ -290,7 +288,9 @@ def mkdir(path, mode=0o777, parents=False, uid=None, gid=None, force=False):
|
|||
|
||||
# Create directory and set permissions
|
||||
try:
|
||||
oldmask = os.umask(000)
|
||||
os.mkdir(path, mode)
|
||||
os.umask(oldmask)
|
||||
except OSError:
|
||||
# mimic Python3.2+ os.makedirs exist_ok behaviour
|
||||
if not force or not os.path.isdir(path):
|
||||
|
|
|
@ -22,12 +22,12 @@ def download_text(url, timeout=30, expected_status_code=200):
|
|||
# Download file
|
||||
try:
|
||||
r = requests.get(url, timeout=timeout)
|
||||
# Invalid URL
|
||||
except requests.exceptions.ConnectionError:
|
||||
raise MoulinetteError("invalid_url", url=url)
|
||||
# SSL exceptions
|
||||
except requests.exceptions.SSLError:
|
||||
raise MoulinetteError("download_ssl_error", url=url)
|
||||
# Invalid URL
|
||||
except requests.exceptions.ConnectionError:
|
||||
raise MoulinetteError("invalid_url", url=url)
|
||||
# Timeout exceptions
|
||||
except requests.exceptions.Timeout:
|
||||
raise MoulinetteError("download_timeout", url=url)
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
"""Pytest fixtures for testing."""
|
||||
|
||||
import ldif
|
||||
import toml
|
||||
import yaml
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
@ -141,6 +144,36 @@ def test_json(tmp_path):
|
|||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_yaml(tmp_path):
|
||||
test_yaml = yaml.dump({"foo": "bar"})
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_bytes(test_yaml)
|
||||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_toml(tmp_path):
|
||||
test_toml = toml.dumps({"foo": "bar"})
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_bytes(str(test_toml))
|
||||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_ldif(tmp_path):
|
||||
test_file = tmp_path / "test.txt"
|
||||
writer = ldif.LDIFWriter(open(str(test_file), 'wb'))
|
||||
|
||||
writer.unparse('mail=alice@example.com', {
|
||||
'cn': ['Alice Alison'],
|
||||
'mail': ['alice@example.com'],
|
||||
'objectclass': ['top', 'person']
|
||||
})
|
||||
|
||||
return test_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user():
|
||||
return os.getlogin()
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
import os
|
||||
|
||||
import pytest
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
from stat import *
|
||||
|
||||
from moulinette import m18n
|
||||
from moulinette.core import MoulinetteError
|
||||
|
@ -8,9 +12,16 @@ from moulinette.utils.filesystem import (
|
|||
append_to_file,
|
||||
read_file,
|
||||
read_json,
|
||||
read_yaml,
|
||||
read_toml,
|
||||
read_ldif,
|
||||
rm,
|
||||
write_to_file,
|
||||
write_to_json,
|
||||
write_to_yaml,
|
||||
mkdir,
|
||||
chown,
|
||||
chmod,
|
||||
)
|
||||
|
||||
|
||||
|
@ -33,15 +44,27 @@ def test_read_file_missing_file():
|
|||
def test_read_file_cannot_read_ioerror(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
with mocker.patch("__builtin__.open", side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_file(str(test_file))
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_file(str(test_file))
|
||||
|
||||
translation = m18n.g("cannot_open_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_file_cannot_read_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_file(str(test_file))
|
||||
|
||||
translation = m18n.g("unknown_error_reading_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_json(test_json):
|
||||
content = read_json(str(test_json))
|
||||
assert "foo" in content.keys()
|
||||
|
@ -51,15 +74,90 @@ def test_read_json(test_json):
|
|||
def test_read_json_cannot_read(test_json, mocker):
|
||||
error = "foobar"
|
||||
|
||||
with mocker.patch("json.loads", side_effect=ValueError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_json(str(test_json))
|
||||
mocker.patch("json.loads", side_effect=ValueError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_json(str(test_json))
|
||||
|
||||
translation = m18n.g("corrupted_json", ressource=str(test_json), error=error)
|
||||
expected_msg = translation.format(ressource=str(test_json), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_yaml(test_yaml):
|
||||
content = read_yaml(str(test_yaml))
|
||||
assert "foo" in content.keys()
|
||||
assert content["foo"] == "bar"
|
||||
|
||||
|
||||
def test_read_yaml_cannot_read(test_yaml, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("yaml.safe_load", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_yaml(str(test_yaml))
|
||||
|
||||
translation = m18n.g("corrupted_yaml", ressource=str(test_yaml), error=error)
|
||||
expected_msg = translation.format(ressource=str(test_yaml), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_toml(test_toml):
|
||||
content = read_toml(str(test_toml))
|
||||
assert "foo" in content.keys()
|
||||
assert content["foo"] == "bar"
|
||||
|
||||
|
||||
def test_read_toml_cannot_read(test_toml, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("toml.loads", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_toml(str(test_toml))
|
||||
|
||||
translation = m18n.g("corrupted_toml", ressource=str(test_toml), error=error)
|
||||
expected_msg = translation.format(ressource=str(test_toml), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_ldif(test_ldif):
|
||||
dn, entry = read_ldif(str(test_ldif))[0]
|
||||
|
||||
assert dn == "mail=alice@example.com"
|
||||
assert entry["mail"] == ["alice@example.com"]
|
||||
assert entry["objectclass"] == ["top", "person"]
|
||||
assert entry["cn"] == ["Alice Alison"]
|
||||
|
||||
dn, entry = read_ldif(str(test_ldif), ["objectclass"])[0]
|
||||
|
||||
assert dn == "mail=alice@example.com"
|
||||
assert entry["mail"] == ["alice@example.com"]
|
||||
assert "objectclass" not in entry
|
||||
assert entry["cn"] == ["Alice Alison"]
|
||||
|
||||
|
||||
def test_read_ldif_cannot_ioerror(test_ldif, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_ldif(str(test_ldif))
|
||||
|
||||
translation = m18n.g("cannot_open_file", file=str(test_ldif), error=error)
|
||||
expected_msg = translation.format(file=str(test_ldif), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_read_ldif_cannot_exception(test_ldif, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
read_ldif(str(test_ldif))
|
||||
|
||||
translation = m18n.g("unknown_error_reading_file", file=str(test_ldif), error=error)
|
||||
expected_msg = translation.format(file=str(test_ldif), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
def test_write_to_existing_file(test_file):
|
||||
write_to_file(str(test_file), "yolo\nswag")
|
||||
assert read_file(str(test_file)) == "yolo\nswag"
|
||||
|
@ -77,15 +175,27 @@ def test_write_to_new_file(tmp_path):
|
|||
def test_write_to_existing_file_bad_perms(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
with mocker.patch("__builtin__.open", side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_file(str(test_file), "yolo\nswag")
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_file(str(test_file), "yolo\nswag")
|
||||
|
||||
translation = m18n.g("cannot_write_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_to_file_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_file(str(test_file), "yolo\nswag")
|
||||
|
||||
translation = m18n.g("error_writing_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_cannot_write_folder(tmp_path):
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_file(str(tmp_path), "yolo\nswag")
|
||||
|
@ -115,7 +225,7 @@ def test_append_to_new_file(tmp_path):
|
|||
assert read_file(str(new_file)) == "yolo\nswag"
|
||||
|
||||
|
||||
def text_write_dict_to_json(tmp_path):
|
||||
def test_write_dict_to_json(tmp_path):
|
||||
new_file = tmp_path / "newfile.json"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
|
@ -129,6 +239,34 @@ def text_write_dict_to_json(tmp_path):
|
|||
assert _json["bar"] == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_write_json_to_existing_file_bad_perms(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_json(str(test_file), dummy_dict)
|
||||
|
||||
translation = m18n.g("cannot_write_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_json_to_file_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_json(str(test_file), dummy_dict)
|
||||
|
||||
translation = m18n.g("error_writing_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def text_write_list_to_json(tmp_path):
|
||||
new_file = tmp_path / "newfile.json"
|
||||
|
||||
|
@ -142,9 +280,9 @@ def text_write_list_to_json(tmp_path):
|
|||
def test_write_to_json_bad_perms(test_json, mocker):
|
||||
error = "foobar"
|
||||
|
||||
with mocker.patch("__builtin__.open", side_effect=IOError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_json(str(test_json), {"a": 1})
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_json(str(test_json), {"a": 1})
|
||||
|
||||
translation = m18n.g("cannot_write_file", file=str(test_json), error=error)
|
||||
expected_msg = translation.format(file=str(test_json), error=error)
|
||||
|
@ -156,6 +294,214 @@ def test_write_json_cannot_write_to_non_existant_folder():
|
|||
write_to_json("/toto/test.json", ["a", "b"])
|
||||
|
||||
|
||||
def test_write_dict_to_yaml(tmp_path):
|
||||
new_file = tmp_path / "newfile.yaml"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
write_to_yaml(str(new_file), dummy_dict)
|
||||
_yaml = read_yaml(str(new_file))
|
||||
|
||||
assert "foo" in _yaml.keys()
|
||||
assert "bar" in _yaml.keys()
|
||||
|
||||
assert _yaml["foo"] == 42
|
||||
assert _yaml["bar"] == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_write_yaml_to_existing_file_bad_perms(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_yaml(str(test_file), dummy_dict)
|
||||
|
||||
translation = m18n.g("cannot_write_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_yaml_to_file_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_yaml(str(test_file), dummy_dict)
|
||||
|
||||
translation = m18n.g("error_writing_file", file=str(test_file), error=error)
|
||||
expected_msg = translation.format(file=str(test_file), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def text_write_list_to_yaml(tmp_path):
|
||||
new_file = tmp_path / "newfile.yaml"
|
||||
|
||||
dummy_list = ["foo", "bar", "baz"]
|
||||
write_to_yaml(str(new_file), dummy_list)
|
||||
|
||||
_yaml = read_yaml(str(new_file))
|
||||
assert _yaml == ["foo", "bar", "baz"]
|
||||
|
||||
|
||||
def test_write_to_yaml_bad_perms(test_yaml, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("__builtin__.open", side_effect=IOError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
write_to_yaml(str(test_yaml), {"a": 1})
|
||||
|
||||
translation = m18n.g("cannot_write_file", file=str(test_yaml), error=error)
|
||||
expected_msg = translation.format(file=str(test_yaml), error=error)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_write_yaml_cannot_write_to_non_existant_folder():
|
||||
with pytest.raises(AssertionError):
|
||||
write_to_yaml("/toto/test.yaml", ["a", "b"])
|
||||
|
||||
|
||||
def test_mkdir(tmp_path):
|
||||
new_path = tmp_path / "new_folder"
|
||||
mkdir(str(new_path))
|
||||
|
||||
assert os.path.isdir(str(new_path))
|
||||
assert oct(os.stat(str(new_path)).st_mode & 0o777) == oct(0o777)
|
||||
|
||||
|
||||
def test_mkdir_with_permission(tmp_path, mocker):
|
||||
new_path = tmp_path / "new_folder"
|
||||
permission = 0o700
|
||||
mkdir(str(new_path), mode=permission)
|
||||
|
||||
assert os.path.isdir(str(new_path))
|
||||
assert oct(os.stat(str(new_path)).st_mode & 0o777) == oct(permission)
|
||||
|
||||
new_path = tmp_path / "new_parent2" / "new_folder"
|
||||
|
||||
with pytest.raises(OSError):
|
||||
mkdir(str(new_path), parents=True, mode=0o000)
|
||||
|
||||
|
||||
def test_mkdir_with_parent(tmp_path):
|
||||
new_path = tmp_path / "new_folder"
|
||||
mkdir(str(new_path) + "/", parents=True)
|
||||
|
||||
assert os.path.isdir(str(new_path))
|
||||
|
||||
new_path = tmp_path / "new_parent" / "new_folder"
|
||||
mkdir(str(new_path), parents=True)
|
||||
|
||||
assert os.path.isdir(str(new_path))
|
||||
|
||||
|
||||
def test_mkdir_existing_folder(tmp_path):
|
||||
new_path = tmp_path / "new_folder"
|
||||
os.makedirs(str(new_path))
|
||||
with pytest.raises(OSError):
|
||||
mkdir(str(new_path))
|
||||
|
||||
|
||||
def test_chown(test_file):
|
||||
with pytest.raises(ValueError):
|
||||
chown(str(test_file))
|
||||
|
||||
current_uid = os.getuid()
|
||||
current_gid = os.getgid()
|
||||
chown(str(test_file), current_uid, current_gid)
|
||||
|
||||
assert os.stat(str(test_file)).st_uid == current_uid
|
||||
assert os.stat(str(test_file)).st_gid == current_gid
|
||||
|
||||
current_gid = os.getgid()
|
||||
chown(str(test_file), uid=None, gid=current_gid)
|
||||
|
||||
assert os.stat(str(test_file)).st_gid == current_gid
|
||||
|
||||
current_uid = pwd.getpwuid(os.getuid())[0]
|
||||
current_gid = grp.getgrgid(os.getgid())[0]
|
||||
chown(str(test_file), current_uid, current_gid)
|
||||
|
||||
assert os.stat(str(test_file)).st_uid == os.getuid()
|
||||
assert os.stat(str(test_file)).st_gid == os.getgid()
|
||||
|
||||
fake_user = "nousrlol"
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
chown(str(test_file), fake_user)
|
||||
|
||||
translation = m18n.g("unknown_user", user=fake_user)
|
||||
expected_msg = translation.format(user=fake_user)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
fake_grp = "nogrplol"
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
chown(str(test_file), gid=fake_grp)
|
||||
|
||||
translation = m18n.g("unknown_group", group=fake_grp)
|
||||
expected_msg = translation.format(group=fake_grp)
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_chown_recursive(test_file):
|
||||
current_uid = os.getuid()
|
||||
dirname = os.path.dirname(str(test_file))
|
||||
mkdir(os.path.join(dirname, "new_dir"))
|
||||
chown(str(dirname), current_uid, recursive=True)
|
||||
|
||||
assert os.stat(str(dirname)).st_uid == current_uid
|
||||
|
||||
|
||||
def test_chown_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("os.chown", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
chown(str(test_file), 1)
|
||||
|
||||
translation = m18n.g("error_changing_file_permissions", path=test_file, error=str(error))
|
||||
expected_msg = translation.format(path=test_file, error=str(error))
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_chmod(test_file):
|
||||
permission = 0o723
|
||||
chmod(str(test_file), permission)
|
||||
|
||||
assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(permission)
|
||||
|
||||
dirname = os.path.dirname(str(test_file))
|
||||
permission = 0o722
|
||||
chmod(str(dirname), permission, recursive=True)
|
||||
|
||||
assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(permission)
|
||||
assert oct(os.stat(dirname).st_mode & 0o777) == oct(permission)
|
||||
|
||||
|
||||
def test_chmod_recursive(test_file):
|
||||
dirname = os.path.dirname(str(test_file))
|
||||
mkdir(os.path.join(dirname, "new_dir"))
|
||||
permission = 0o721
|
||||
fpermission = 0o720
|
||||
chmod(str(dirname), permission, fmode=fpermission, recursive=True)
|
||||
|
||||
assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(fpermission)
|
||||
assert oct(os.stat(dirname).st_mode & 0o777) == oct(permission)
|
||||
|
||||
|
||||
def test_chmod_exception(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
mocker.patch("os.chmod", side_effect=Exception(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
chmod(str(test_file), 0o000)
|
||||
|
||||
translation = m18n.g("error_changing_file_permissions", path=test_file, error=str(error))
|
||||
expected_msg = translation.format(path=test_file, error=str(error))
|
||||
assert expected_msg in str(exception)
|
||||
|
||||
|
||||
def test_remove_file(test_file):
|
||||
assert os.path.exists(str(test_file))
|
||||
rm(str(test_file))
|
||||
|
@ -165,9 +511,9 @@ def test_remove_file(test_file):
|
|||
def test_remove_file_bad_perms(test_file, mocker):
|
||||
error = "foobar"
|
||||
|
||||
with mocker.patch("os.remove", side_effect=OSError(error)):
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
rm(str(test_file))
|
||||
mocker.patch("os.remove", side_effect=OSError(error))
|
||||
with pytest.raises(MoulinetteError) as exception:
|
||||
rm(str(test_file))
|
||||
|
||||
translation = m18n.g("error_removing", path=str(test_file), error=error)
|
||||
expected_msg = translation.format(path=str(test_file), error=error)
|
||||
|
|
|
@ -33,9 +33,17 @@ def test_download_ssl_error(test_url):
|
|||
download_text(test_url)
|
||||
|
||||
|
||||
def test_download_connection_error(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
exception = requests.exceptions.ConnectionError
|
||||
mock.register_uri("GET", test_url, exc=exception)
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(test_url)
|
||||
|
||||
|
||||
def test_download_timeout(test_url):
|
||||
with requests_mock.Mocker() as mock:
|
||||
exception = requests.exceptions.ConnectTimeout
|
||||
exception = requests.exceptions.Timeout
|
||||
mock.register_uri("GET", test_url, exc=exception)
|
||||
with pytest.raises(MoulinetteError):
|
||||
download_text(test_url)
|
||||
|
|
|
@ -4,6 +4,8 @@ from subprocess import CalledProcessError
|
|||
import pytest
|
||||
|
||||
from moulinette.utils.process import run_commands
|
||||
from moulinette.utils.process import call_async_output
|
||||
from moulinette.utils.process import check_output
|
||||
|
||||
|
||||
def test_run_shell_command_list(test_file):
|
||||
|
@ -15,3 +17,92 @@ def test_run_shell_command_list(test_file):
|
|||
def test_run_shell_bad_cmd():
|
||||
with pytest.raises(CalledProcessError):
|
||||
run_commands(["yolo swag"])
|
||||
|
||||
|
||||
def test_run_shell_bad_cmd_with_callback():
|
||||
def callback(a, b, c):
|
||||
assert isinstance(a, int)
|
||||
assert isinstance(b, str)
|
||||
assert isinstance(c, str)
|
||||
return True
|
||||
assert run_commands(["yolo swag", "yolo swag", "yolo swag"], callback=callback) == 3
|
||||
|
||||
def callback(a, b, c):
|
||||
assert isinstance(a, int)
|
||||
assert isinstance(b, str)
|
||||
assert isinstance(c, str)
|
||||
return False
|
||||
assert run_commands(["yolo swag", "yolo swag"], callback=callback) == 1
|
||||
|
||||
def callback(a, b, c):
|
||||
assert isinstance(a, int)
|
||||
assert isinstance(b, str)
|
||||
assert isinstance(c, tuple)
|
||||
return True
|
||||
run_commands(["yolo swag"], separate_stderr=True, callback=callback)
|
||||
|
||||
|
||||
def test_run_shell_bad_callback():
|
||||
callback = 1
|
||||
with pytest.raises(ValueError):
|
||||
run_commands(["ls"], callback=callback)
|
||||
|
||||
|
||||
def test_run_shell_kwargs():
|
||||
with pytest.raises(ValueError):
|
||||
run_commands([""], stdout="None")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
run_commands([""], stderr="None")
|
||||
|
||||
run_commands(["ls"], cwd="/tmp")
|
||||
|
||||
with pytest.raises(OSError):
|
||||
run_commands(["ls"], cwd="/yoloswag")
|
||||
|
||||
|
||||
def test_call_async_output(test_file):
|
||||
|
||||
def callback(a):
|
||||
assert a == "foo\n" or a == "bar\n"
|
||||
call_async_output(["cat", str(test_file)], callback)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
call_async_output(["cat", str(test_file)], 1)
|
||||
|
||||
def callbackA(a):
|
||||
assert a == "foo\n" or a == "bar\n"
|
||||
def callbackB(a):
|
||||
pass
|
||||
callback = (callbackA, callbackB)
|
||||
call_async_output(["cat", str(test_file)], callback)
|
||||
|
||||
|
||||
def test_call_async_output_kwargs(test_file):
|
||||
def callback(a):
|
||||
assert a == "foo\n" or a == "bar\n"
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
call_async_output(["cat", str(test_file)], callback, stdout=None)
|
||||
with pytest.raises(ValueError):
|
||||
call_async_output(["cat", str(test_file)], callback, stderr=None)
|
||||
|
||||
call_async_output(["cat", str(test_file)], callback, stdinfo=None)
|
||||
|
||||
def callbackA(a):
|
||||
assert a == "foo\n" or a == "bar\n"
|
||||
def callbackB(a):
|
||||
pass
|
||||
def callbackC(a):
|
||||
pass
|
||||
|
||||
callback = (callbackA, callbackB, callbackC)
|
||||
|
||||
os.mkdir("/tmp/teststdinfo/")
|
||||
call_async_output(["cat", str(test_file)], callback, stdinfo="/tmp/teststdinfo/teststdinfo")
|
||||
|
||||
|
||||
def test_check_output(test_file):
|
||||
assert check_output(["cat", str(test_file)], shell=False) == "foo\nbar\n"
|
||||
|
||||
assert check_output("cat %s" % str(test_file)) == "foo\nbar\n"
|
||||
|
|
|
@ -4,6 +4,7 @@ from moulinette.utils.text import search, searchf, prependlines, random_ascii
|
|||
def test_search():
|
||||
assert search("a", "a a a") == ["a", "a", "a"]
|
||||
assert search("a", "a a a", count=2) == ["a", "a"]
|
||||
assert search("a", "a a a", count=-1) == "a"
|
||||
assert not search("a", "c c d")
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue