diff --git a/moulinette/utils/filesystem.py b/moulinette/utils/filesystem.py index 49e84c5b..b7580b2c 100644 --- a/moulinette/utils/filesystem.py +++ b/moulinette/utils/filesystem.py @@ -37,7 +37,7 @@ def read_file(file_path): file_content = f.read() except IOError as e: raise MoulinetteError("cannot_open_file", file=file_path, error=str(e)) - except Exception: + except Exception as e: raise MoulinetteError( "unknown_error_reading_file", file=file_path, error=str(e) ) @@ -100,9 +100,7 @@ def read_toml(file_path): try: loaded_toml = toml.loads(file_content, _dict=OrderedDict) except Exception as e: - raise MoulinetteError( - errno.EINVAL, m18n.g("corrupted_toml", ressource=file_path, error=str(e)) - ) + raise MoulinetteError("corrupted_toml", ressource=file_path, error=str(e)) return loaded_toml @@ -255,7 +253,7 @@ def write_to_yaml(file_path, data): raise MoulinetteError("error_writing_file", file=file_path, error=str(e)) -def mkdir(path, mode=0o777, parents=False, uid=None, gid=None, force=False): +def mkdir(path, mode=0o0777, parents=False, uid=None, gid=None, force=False): """Create a directory with optional features Create a directory and optionaly set its permissions to mode and its @@ -290,7 +288,9 @@ def mkdir(path, mode=0o777, parents=False, uid=None, gid=None, force=False): # Create directory and set permissions try: + oldmask = os.umask(000) os.mkdir(path, mode) + os.umask(oldmask) except OSError: # mimic Python3.2+ os.makedirs exist_ok behaviour if not force or not os.path.isdir(path): diff --git a/moulinette/utils/network.py b/moulinette/utils/network.py index 8a014e34..bddad986 100644 --- a/moulinette/utils/network.py +++ b/moulinette/utils/network.py @@ -22,12 +22,12 @@ def download_text(url, timeout=30, expected_status_code=200): # Download file try: r = requests.get(url, timeout=timeout) - # Invalid URL - except requests.exceptions.ConnectionError: - raise MoulinetteError("invalid_url", url=url) # SSL exceptions except requests.exceptions.SSLError: raise MoulinetteError("download_ssl_error", url=url) + # Invalid URL + except requests.exceptions.ConnectionError: + raise MoulinetteError("invalid_url", url=url) # Timeout exceptions except requests.exceptions.Timeout: raise MoulinetteError("download_timeout", url=url) diff --git a/test/conftest.py b/test/conftest.py index 309682ac..c26bd99d 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,8 @@ """Pytest fixtures for testing.""" +import ldif +import toml +import yaml import json import os import shutil @@ -141,6 +144,36 @@ def test_json(tmp_path): return test_file +@pytest.fixture +def test_yaml(tmp_path): + test_yaml = yaml.dump({"foo": "bar"}) + test_file = tmp_path / "test.txt" + test_file.write_bytes(test_yaml) + return test_file + + +@pytest.fixture +def test_toml(tmp_path): + test_toml = toml.dumps({"foo": "bar"}) + test_file = tmp_path / "test.txt" + test_file.write_bytes(str(test_toml)) + return test_file + + +@pytest.fixture +def test_ldif(tmp_path): + test_file = tmp_path / "test.txt" + writer = ldif.LDIFWriter(open(str(test_file), 'wb')) + + writer.unparse('mail=alice@example.com', { + 'cn': ['Alice Alison'], + 'mail': ['alice@example.com'], + 'objectclass': ['top', 'person'] + }) + + return test_file + + @pytest.fixture def user(): return os.getlogin() diff --git a/test/test_filesystem.py b/test/test_filesystem.py index 56be370a..545574d0 100644 --- a/test/test_filesystem.py +++ b/test/test_filesystem.py @@ -1,6 +1,10 @@ import os import pytest +import pwd +import grp + +from stat import * from moulinette import m18n from moulinette.core import MoulinetteError @@ -8,9 +12,16 @@ from moulinette.utils.filesystem import ( append_to_file, read_file, read_json, + read_yaml, + read_toml, + read_ldif, rm, write_to_file, write_to_json, + write_to_yaml, + mkdir, + chown, + chmod, ) @@ -33,15 +44,27 @@ def test_read_file_missing_file(): def test_read_file_cannot_read_ioerror(test_file, mocker): error = "foobar" - with mocker.patch("__builtin__.open", side_effect=IOError(error)): - with pytest.raises(MoulinetteError) as exception: - read_file(str(test_file)) + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + read_file(str(test_file)) translation = m18n.g("cannot_open_file", file=str(test_file), error=error) expected_msg = translation.format(file=str(test_file), error=error) assert expected_msg in str(exception) +def test_read_file_cannot_read_exception(test_file, mocker): + error = "foobar" + + mocker.patch("__builtin__.open", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + read_file(str(test_file)) + + translation = m18n.g("unknown_error_reading_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + def test_read_json(test_json): content = read_json(str(test_json)) assert "foo" in content.keys() @@ -51,15 +74,90 @@ def test_read_json(test_json): def test_read_json_cannot_read(test_json, mocker): error = "foobar" - with mocker.patch("json.loads", side_effect=ValueError(error)): - with pytest.raises(MoulinetteError) as exception: - read_json(str(test_json)) + mocker.patch("json.loads", side_effect=ValueError(error)) + with pytest.raises(MoulinetteError) as exception: + read_json(str(test_json)) translation = m18n.g("corrupted_json", ressource=str(test_json), error=error) expected_msg = translation.format(ressource=str(test_json), error=error) assert expected_msg in str(exception) +def test_read_yaml(test_yaml): + content = read_yaml(str(test_yaml)) + assert "foo" in content.keys() + assert content["foo"] == "bar" + + +def test_read_yaml_cannot_read(test_yaml, mocker): + error = "foobar" + + mocker.patch("yaml.safe_load", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + read_yaml(str(test_yaml)) + + translation = m18n.g("corrupted_yaml", ressource=str(test_yaml), error=error) + expected_msg = translation.format(ressource=str(test_yaml), error=error) + assert expected_msg in str(exception) + + +def test_read_toml(test_toml): + content = read_toml(str(test_toml)) + assert "foo" in content.keys() + assert content["foo"] == "bar" + + +def test_read_toml_cannot_read(test_toml, mocker): + error = "foobar" + + mocker.patch("toml.loads", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + read_toml(str(test_toml)) + + translation = m18n.g("corrupted_toml", ressource=str(test_toml), error=error) + expected_msg = translation.format(ressource=str(test_toml), error=error) + assert expected_msg in str(exception) + + +def test_read_ldif(test_ldif): + dn, entry = read_ldif(str(test_ldif))[0] + + assert dn == "mail=alice@example.com" + assert entry["mail"] == ["alice@example.com"] + assert entry["objectclass"] == ["top", "person"] + assert entry["cn"] == ["Alice Alison"] + + dn, entry = read_ldif(str(test_ldif), ["objectclass"])[0] + + assert dn == "mail=alice@example.com" + assert entry["mail"] == ["alice@example.com"] + assert "objectclass" not in entry + assert entry["cn"] == ["Alice Alison"] + + +def test_read_ldif_cannot_ioerror(test_ldif, mocker): + error = "foobar" + + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + read_ldif(str(test_ldif)) + + translation = m18n.g("cannot_open_file", file=str(test_ldif), error=error) + expected_msg = translation.format(file=str(test_ldif), error=error) + assert expected_msg in str(exception) + + +def test_read_ldif_cannot_exception(test_ldif, mocker): + error = "foobar" + + mocker.patch("__builtin__.open", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + read_ldif(str(test_ldif)) + + translation = m18n.g("unknown_error_reading_file", file=str(test_ldif), error=error) + expected_msg = translation.format(file=str(test_ldif), error=error) + assert expected_msg in str(exception) + def test_write_to_existing_file(test_file): write_to_file(str(test_file), "yolo\nswag") assert read_file(str(test_file)) == "yolo\nswag" @@ -77,15 +175,27 @@ def test_write_to_new_file(tmp_path): def test_write_to_existing_file_bad_perms(test_file, mocker): error = "foobar" - with mocker.patch("__builtin__.open", side_effect=IOError(error)): - with pytest.raises(MoulinetteError) as exception: - write_to_file(str(test_file), "yolo\nswag") + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_file(str(test_file), "yolo\nswag") translation = m18n.g("cannot_write_file", file=str(test_file), error=error) expected_msg = translation.format(file=str(test_file), error=error) assert expected_msg in str(exception) +def test_write_to_file_exception(test_file, mocker): + error = "foobar" + + mocker.patch("__builtin__.open", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_file(str(test_file), "yolo\nswag") + + translation = m18n.g("error_writing_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + def test_write_cannot_write_folder(tmp_path): with pytest.raises(AssertionError): write_to_file(str(tmp_path), "yolo\nswag") @@ -115,7 +225,7 @@ def test_append_to_new_file(tmp_path): assert read_file(str(new_file)) == "yolo\nswag" -def text_write_dict_to_json(tmp_path): +def test_write_dict_to_json(tmp_path): new_file = tmp_path / "newfile.json" dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} @@ -129,6 +239,34 @@ def text_write_dict_to_json(tmp_path): assert _json["bar"] == ["a", "b", "c"] +def test_write_json_to_existing_file_bad_perms(test_file, mocker): + error = "foobar" + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_json(str(test_file), dummy_dict) + + translation = m18n.g("cannot_write_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + +def test_write_json_to_file_exception(test_file, mocker): + error = "foobar" + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + + mocker.patch("__builtin__.open", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_json(str(test_file), dummy_dict) + + translation = m18n.g("error_writing_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + def text_write_list_to_json(tmp_path): new_file = tmp_path / "newfile.json" @@ -142,9 +280,9 @@ def text_write_list_to_json(tmp_path): def test_write_to_json_bad_perms(test_json, mocker): error = "foobar" - with mocker.patch("__builtin__.open", side_effect=IOError(error)): - with pytest.raises(MoulinetteError) as exception: - write_to_json(str(test_json), {"a": 1}) + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_json(str(test_json), {"a": 1}) translation = m18n.g("cannot_write_file", file=str(test_json), error=error) expected_msg = translation.format(file=str(test_json), error=error) @@ -156,6 +294,214 @@ def test_write_json_cannot_write_to_non_existant_folder(): write_to_json("/toto/test.json", ["a", "b"]) +def test_write_dict_to_yaml(tmp_path): + new_file = tmp_path / "newfile.yaml" + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + write_to_yaml(str(new_file), dummy_dict) + _yaml = read_yaml(str(new_file)) + + assert "foo" in _yaml.keys() + assert "bar" in _yaml.keys() + + assert _yaml["foo"] == 42 + assert _yaml["bar"] == ["a", "b", "c"] + + +def test_write_yaml_to_existing_file_bad_perms(test_file, mocker): + error = "foobar" + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_yaml(str(test_file), dummy_dict) + + translation = m18n.g("cannot_write_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + +def test_write_yaml_to_file_exception(test_file, mocker): + error = "foobar" + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + + mocker.patch("__builtin__.open", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_yaml(str(test_file), dummy_dict) + + translation = m18n.g("error_writing_file", file=str(test_file), error=error) + expected_msg = translation.format(file=str(test_file), error=error) + assert expected_msg in str(exception) + + +def text_write_list_to_yaml(tmp_path): + new_file = tmp_path / "newfile.yaml" + + dummy_list = ["foo", "bar", "baz"] + write_to_yaml(str(new_file), dummy_list) + + _yaml = read_yaml(str(new_file)) + assert _yaml == ["foo", "bar", "baz"] + + +def test_write_to_yaml_bad_perms(test_yaml, mocker): + error = "foobar" + + mocker.patch("__builtin__.open", side_effect=IOError(error)) + with pytest.raises(MoulinetteError) as exception: + write_to_yaml(str(test_yaml), {"a": 1}) + + translation = m18n.g("cannot_write_file", file=str(test_yaml), error=error) + expected_msg = translation.format(file=str(test_yaml), error=error) + assert expected_msg in str(exception) + + +def test_write_yaml_cannot_write_to_non_existant_folder(): + with pytest.raises(AssertionError): + write_to_yaml("/toto/test.yaml", ["a", "b"]) + + +def test_mkdir(tmp_path): + new_path = tmp_path / "new_folder" + mkdir(str(new_path)) + + assert os.path.isdir(str(new_path)) + assert oct(os.stat(str(new_path)).st_mode & 0o777) == oct(0o777) + + +def test_mkdir_with_permission(tmp_path, mocker): + new_path = tmp_path / "new_folder" + permission = 0o700 + mkdir(str(new_path), mode=permission) + + assert os.path.isdir(str(new_path)) + assert oct(os.stat(str(new_path)).st_mode & 0o777) == oct(permission) + + new_path = tmp_path / "new_parent2" / "new_folder" + + with pytest.raises(OSError): + mkdir(str(new_path), parents=True, mode=0o000) + + +def test_mkdir_with_parent(tmp_path): + new_path = tmp_path / "new_folder" + mkdir(str(new_path) + "/", parents=True) + + assert os.path.isdir(str(new_path)) + + new_path = tmp_path / "new_parent" / "new_folder" + mkdir(str(new_path), parents=True) + + assert os.path.isdir(str(new_path)) + + +def test_mkdir_existing_folder(tmp_path): + new_path = tmp_path / "new_folder" + os.makedirs(str(new_path)) + with pytest.raises(OSError): + mkdir(str(new_path)) + + +def test_chown(test_file): + with pytest.raises(ValueError): + chown(str(test_file)) + + current_uid = os.getuid() + current_gid = os.getgid() + chown(str(test_file), current_uid, current_gid) + + assert os.stat(str(test_file)).st_uid == current_uid + assert os.stat(str(test_file)).st_gid == current_gid + + current_gid = os.getgid() + chown(str(test_file), uid=None, gid=current_gid) + + assert os.stat(str(test_file)).st_gid == current_gid + + current_uid = pwd.getpwuid(os.getuid())[0] + current_gid = grp.getgrgid(os.getgid())[0] + chown(str(test_file), current_uid, current_gid) + + assert os.stat(str(test_file)).st_uid == os.getuid() + assert os.stat(str(test_file)).st_gid == os.getgid() + + fake_user = "nousrlol" + with pytest.raises(MoulinetteError) as exception: + chown(str(test_file), fake_user) + + translation = m18n.g("unknown_user", user=fake_user) + expected_msg = translation.format(user=fake_user) + assert expected_msg in str(exception) + + fake_grp = "nogrplol" + with pytest.raises(MoulinetteError) as exception: + chown(str(test_file), gid=fake_grp) + + translation = m18n.g("unknown_group", group=fake_grp) + expected_msg = translation.format(group=fake_grp) + assert expected_msg in str(exception) + + +def test_chown_recursive(test_file): + current_uid = os.getuid() + dirname = os.path.dirname(str(test_file)) + mkdir(os.path.join(dirname, "new_dir")) + chown(str(dirname), current_uid, recursive=True) + + assert os.stat(str(dirname)).st_uid == current_uid + + +def test_chown_exception(test_file, mocker): + error = "foobar" + + mocker.patch("os.chown", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + chown(str(test_file), 1) + + translation = m18n.g("error_changing_file_permissions", path=test_file, error=str(error)) + expected_msg = translation.format(path=test_file, error=str(error)) + assert expected_msg in str(exception) + + +def test_chmod(test_file): + permission = 0o723 + chmod(str(test_file), permission) + + assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(permission) + + dirname = os.path.dirname(str(test_file)) + permission = 0o722 + chmod(str(dirname), permission, recursive=True) + + assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(permission) + assert oct(os.stat(dirname).st_mode & 0o777) == oct(permission) + + +def test_chmod_recursive(test_file): + dirname = os.path.dirname(str(test_file)) + mkdir(os.path.join(dirname, "new_dir")) + permission = 0o721 + fpermission = 0o720 + chmod(str(dirname), permission, fmode=fpermission, recursive=True) + + assert oct(os.stat(str(test_file)).st_mode & 0o777) == oct(fpermission) + assert oct(os.stat(dirname).st_mode & 0o777) == oct(permission) + + +def test_chmod_exception(test_file, mocker): + error = "foobar" + + mocker.patch("os.chmod", side_effect=Exception(error)) + with pytest.raises(MoulinetteError) as exception: + chmod(str(test_file), 0o000) + + translation = m18n.g("error_changing_file_permissions", path=test_file, error=str(error)) + expected_msg = translation.format(path=test_file, error=str(error)) + assert expected_msg in str(exception) + + def test_remove_file(test_file): assert os.path.exists(str(test_file)) rm(str(test_file)) @@ -165,9 +511,9 @@ def test_remove_file(test_file): def test_remove_file_bad_perms(test_file, mocker): error = "foobar" - with mocker.patch("os.remove", side_effect=OSError(error)): - with pytest.raises(MoulinetteError) as exception: - rm(str(test_file)) + mocker.patch("os.remove", side_effect=OSError(error)) + with pytest.raises(MoulinetteError) as exception: + rm(str(test_file)) translation = m18n.g("error_removing", path=str(test_file), error=error) expected_msg = translation.format(path=str(test_file), error=error) diff --git a/test/test_network.py b/test/test_network.py index 1887d6f1..6d5c574d 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -33,9 +33,17 @@ def test_download_ssl_error(test_url): download_text(test_url) +def test_download_connection_error(test_url): + with requests_mock.Mocker() as mock: + exception = requests.exceptions.ConnectionError + mock.register_uri("GET", test_url, exc=exception) + with pytest.raises(MoulinetteError): + download_text(test_url) + + def test_download_timeout(test_url): with requests_mock.Mocker() as mock: - exception = requests.exceptions.ConnectTimeout + exception = requests.exceptions.Timeout mock.register_uri("GET", test_url, exc=exception) with pytest.raises(MoulinetteError): download_text(test_url) diff --git a/test/test_process.py b/test/test_process.py index 2565a732..e92dc747 100644 --- a/test/test_process.py +++ b/test/test_process.py @@ -4,6 +4,8 @@ from subprocess import CalledProcessError import pytest from moulinette.utils.process import run_commands +from moulinette.utils.process import call_async_output +from moulinette.utils.process import check_output def test_run_shell_command_list(test_file): @@ -15,3 +17,92 @@ def test_run_shell_command_list(test_file): def test_run_shell_bad_cmd(): with pytest.raises(CalledProcessError): run_commands(["yolo swag"]) + + +def test_run_shell_bad_cmd_with_callback(): + def callback(a, b, c): + assert isinstance(a, int) + assert isinstance(b, str) + assert isinstance(c, str) + return True + assert run_commands(["yolo swag", "yolo swag", "yolo swag"], callback=callback) == 3 + + def callback(a, b, c): + assert isinstance(a, int) + assert isinstance(b, str) + assert isinstance(c, str) + return False + assert run_commands(["yolo swag", "yolo swag"], callback=callback) == 1 + + def callback(a, b, c): + assert isinstance(a, int) + assert isinstance(b, str) + assert isinstance(c, tuple) + return True + run_commands(["yolo swag"], separate_stderr=True, callback=callback) + + +def test_run_shell_bad_callback(): + callback = 1 + with pytest.raises(ValueError): + run_commands(["ls"], callback=callback) + + +def test_run_shell_kwargs(): + with pytest.raises(ValueError): + run_commands([""], stdout="None") + + with pytest.raises(ValueError): + run_commands([""], stderr="None") + + run_commands(["ls"], cwd="/tmp") + + with pytest.raises(OSError): + run_commands(["ls"], cwd="/yoloswag") + + +def test_call_async_output(test_file): + + def callback(a): + assert a == "foo\n" or a == "bar\n" + call_async_output(["cat", str(test_file)], callback) + + with pytest.raises(ValueError): + call_async_output(["cat", str(test_file)], 1) + + def callbackA(a): + assert a == "foo\n" or a == "bar\n" + def callbackB(a): + pass + callback = (callbackA, callbackB) + call_async_output(["cat", str(test_file)], callback) + + +def test_call_async_output_kwargs(test_file): + def callback(a): + assert a == "foo\n" or a == "bar\n" + + with pytest.raises(ValueError): + call_async_output(["cat", str(test_file)], callback, stdout=None) + with pytest.raises(ValueError): + call_async_output(["cat", str(test_file)], callback, stderr=None) + + call_async_output(["cat", str(test_file)], callback, stdinfo=None) + + def callbackA(a): + assert a == "foo\n" or a == "bar\n" + def callbackB(a): + pass + def callbackC(a): + pass + + callback = (callbackA, callbackB, callbackC) + + os.mkdir("/tmp/teststdinfo/") + call_async_output(["cat", str(test_file)], callback, stdinfo="/tmp/teststdinfo/teststdinfo") + + +def test_check_output(test_file): + assert check_output(["cat", str(test_file)], shell=False) == "foo\nbar\n" + + assert check_output("cat %s" % str(test_file)) == "foo\nbar\n" diff --git a/test/test_text.py b/test/test_text.py index b7c667cd..a734b9cf 100644 --- a/test/test_text.py +++ b/test/test_text.py @@ -4,6 +4,7 @@ from moulinette.utils.text import search, searchf, prependlines, random_ascii def test_search(): assert search("a", "a a a") == ["a", "a", "a"] assert search("a", "a a a", count=2) == ["a", "a"] + assert search("a", "a a a", count=-1) == "a" assert not search("a", "c c d")