From c9967372f16700dca6af0b8a12e0facf833c6b47 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 21 Jul 2017 21:05:46 -0400 Subject: [PATCH] [enh] More helpers for common IO operations (#141) --- locales/en.json | 15 +- moulinette/utils/filesystem.py | 180 +++++++++++-- moulinette/utils/network.py | 61 +++++ moulinette/utils/process.py | 31 ++- moulinette/utils/tests/conftest.py | 96 +++++++ moulinette/utils/tests/test_filesystem.py | 299 ++++++++++++++++++++++ moulinette/utils/tests/test_network.py | 92 +++++++ moulinette/utils/tests/test_process.py | 66 +++++ 8 files changed, 806 insertions(+), 34 deletions(-) create mode 100644 moulinette/utils/network.py create mode 100644 moulinette/utils/tests/conftest.py create mode 100644 moulinette/utils/tests/test_filesystem.py create mode 100644 moulinette/utils/tests/test_network.py create mode 100644 moulinette/utils/tests/test_process.py diff --git a/locales/en.json b/locales/en.json index 93f2d730..6eda60ca 100644 --- a/locales/en.json +++ b/locales/en.json @@ -36,5 +36,18 @@ "unknown_user": "Unknown '{user}' user", "values_mismatch": "Values don't match", "warning": "Warning:", - "websocket_request_expected": "Expected a WebSocket request" + "websocket_request_expected": "Expected a WebSocket request", + "cannot_open_file": "Could not open file {file:s} (reason: {error:s})", + "cannot_write_file": "Could not write file {file:s} (reason: {error:s})", + "unknown_error_reading_file": "Unknown error while trying to read file {file:s}", + "corrupted_json": "Corrupted json read from {ressource:s} (reason: {error:s})", + "error_writing_file": "Error when writing file {file:s}: {error:s}", + "error_removing": "Error when removing {path:s}: {error:s}", + "error_changing_file_permissions": "Error when changing permissions for {path:s}: {error:s}", + "invalid_url": "Invalid url {url:s} (does this site exists ?)", + "download_ssl_error": "SSL error when connecting to {url:s}", + "download_timeout": "{url:s} took too long to answer, gave up.", + "download_unknown_error": "Error when downloading data from {url:s} : {error:s}", + "download_bad_status_code": "{url:s} returned status code {code:s}", + "command_unknown": "Command '{command:s}' unknown ?" } diff --git a/moulinette/utils/filesystem.py b/moulinette/utils/filesystem.py index 182a4b30..737bdd18 100644 --- a/moulinette/utils/filesystem.py +++ b/moulinette/utils/filesystem.py @@ -1,13 +1,140 @@ import os import errno import shutil +import json +import grp from pwd import getpwnam from moulinette.core import MoulinetteError - # Files & directories -------------------------------------------------- +def read_file(file_path): + """ + Read a regular text file + + Keyword argument: + file_path -- Path to the text file + """ + assert isinstance(file_path, basestring) + + # Check file exists + if not os.path.isfile(file_path): + raise MoulinetteError(errno.ENOENT, + m18n.g('file_not_exist', path=file_path)) + + # Open file and read content + try: + with open(file_path, "r") as f: + file_content = f.read() + except IOError as e: + raise MoulinetteError(errno.EACCES, + m18n.g('cannot_open_file', + file=file_path, error=str(e))) + except Exception as e: + raise MoulinetteError(errno.EIO, + m18n.g('error_reading_file', + file=file_path, error=str(e))) + + return file_content + + +def read_json(file_path): + """ + Read a json file + + Keyword argument: + file_path -- Path to the json file + """ + + # Read file + file_content = read_file(file_path) + + # Try to load json to check if it's syntaxically correct + try: + loaded_json = json.loads(file_content) + except ValueError as e: + raise MoulinetteError(errno.EINVAL, + m18n.g('corrupted_json', + ressource=file_path, error=str(e))) + + return loaded_json + + +def write_to_file(file_path, data, file_mode="w"): + """ + Write a single string or a list of string to a text file. + The text file will be overwritten by default. + + Keyword argument: + file_path -- Path to the output file + data -- The data to write (must be a string or list of string) + file_mode -- Mode used when writing the file. Option meant to be used + by append_to_file to avoid duplicating the code of this function. + """ + assert isinstance(data, basestring) or isinstance(data, list) + assert not os.path.isdir(file_path) + assert os.path.isdir(os.path.dirname(file_path)) + + # If data is a list, check elements are strings and build a single string + if not isinstance(data, basestring): + for element in data: + assert isinstance(element, basestring) + data = '\n'.join(data) + + try: + with open(file_path, file_mode) as f: + f.write(data) + except IOError as e: + raise MoulinetteError(errno.EACCES, + m18n.g('cannot_write_file', + file=file_path, error=str(e))) + except Exception as e: + raise MoulinetteError(errno.EIO, + m18n.g('error_writing_file', + file=file_path, error=str(e))) + +def append_to_file(file_path, data): + """ + Append a single string or a list of string to a text file. + + Keyword argument: + file_path -- Path to the output file + data -- The data to write (must be a string or list of string) + """ + + write_to_file(file_path, data, file_mode="a") + + +def write_to_json(file_path, data): + """ + Write a dictionnary or a list to a json file + + Keyword argument: + file_path -- Path to the output json file + data -- The data to write (must be a dict or a list) + """ + + # Assumptions + assert isinstance(file_path, basestring) + assert isinstance(data, dict) or isinstance(data, list) + assert not os.path.isdir(file_path) + assert os.path.isdir(os.path.dirname(file_path)) + + # Write dict to file + try: + with open(file_path, "w") as f: + json.dump(data, f) + except IOError as e: + raise MoulinetteError(errno.EACCES, + m18n.g('cannot_write_file', + file=file_path, error=str(e))) + except Exception as e: + raise MoulinetteError(errno.EIO, + m18n.g('_error_writing_file', + file=file_path, error=str(e))) + + def mkdir(path, mode=0777, parents=False, uid=None, gid=None, force=False): """Create a directory with optional features @@ -70,20 +197,25 @@ def chown(path, uid=None, gid=None, recursive=False): uid = -1 if isinstance(gid, basestring): try: - gid = getpwnam(gid).gr_gid + gid = grp.getgrnam(gid).gr_gid except KeyError: raise MoulinetteError(errno.EINVAL, m18n.g('unknown_group', group=gid)) elif gid is None: gid = -1 - os.chown(path, uid, gid) - if recursive and os.path.isdir(path): - for root, dirs, files in os.walk(path): - for d in dirs: - os.chown(os.path.join(root, d), uid, gid) - for f in files: - os.chown(os.path.join(root, f), uid, gid) + try: + os.chown(path, uid, gid) + if recursive and os.path.isdir(path): + for root, dirs, files in os.walk(path): + for d in dirs: + os.chown(os.path.join(root, d), uid, gid) + for f in files: + os.chown(os.path.join(root, f), uid, gid) + except Exception as e: + raise MoulinetteError(errno.EIO, + m18n.g('error_changing_file_permissions', + path=path, error=str(e))) def chmod(path, mode, fmode=None, recursive=False): @@ -95,15 +227,21 @@ def chmod(path, mode, fmode=None, recursive=False): - recursive -- Operate on path recursively """ - os.chmod(path, mode) - if recursive and os.path.isdir(path): - if fmode is None: - fmode = mode - for root, dirs, files in os.walk(path): - for d in dirs: - os.chmod(os.path.join(root, d), mode) - for f in files: - os.chmod(os.path.join(root, f), fmode) + + try: + os.chmod(path, mode) + if recursive and os.path.isdir(path): + if fmode is None: + fmode = mode + for root, dirs, files in os.walk(path): + for d in dirs: + os.chmod(os.path.join(root, d), mode) + for f in files: + os.chmod(os.path.join(root, f), fmode) + except Exception as e: + raise MoulinetteError(errno.EIO, + m18n.g('error_changing_file_permissions', + path=path, error=str(e))) def rm(path, recursive=False, force=False): @@ -120,6 +258,8 @@ def rm(path, recursive=False, force=False): else: try: os.remove(path) - except OSError: + except OSError as e: if not force: - raise + raise MoulinetteError(errno.EIO, + m18n.g('error_removing', + path=path, error=str(e))) diff --git a/moulinette/utils/network.py b/moulinette/utils/network.py new file mode 100644 index 00000000..b5c75241 --- /dev/null +++ b/moulinette/utils/network.py @@ -0,0 +1,61 @@ +import errno +import requests +import json + +from moulinette.core import MoulinetteError + + +def download_text(url, timeout=30): + """ + Download text from a url and returns the raw text + + Keyword argument: + url -- The url to download the data from + timeout -- Number of seconds allowed for download to effectively start + before giving up + """ + # Assumptions + assert isinstance(url, str) + + # Download file + try: + r = requests.get(url, timeout=timeout) + # Invalid URL + except requests.exceptions.ConnectionError: + raise MoulinetteError(errno.EBADE, + m18n.g('invalid_url', url=url)) + # SSL exceptions + except requests.exceptions.SSLError: + raise MoulinetteError(errno.EBADE, + m18n.g('download_ssl_error', url=url)) + # Timeout exceptions + except requests.exceptions.Timeout: + raise MoulinetteError(errno.ETIME, + m18n.g('download_timeout', url=url)) + # Unknown stuff + except Exception as e: + raise MoulinetteError(errno.ECONNRESET, + m18n.g('download_unknown_error', + url=url, error=str(e))) + # Assume error if status code is not 200 (OK) + if r.status_code != 200: + raise MoulinetteError(errno.EBADE, + m18n.g('download_bad_status_code', + url=url, code=str(r.status_code))) + + return r.text + + +def download_json(url, timeout=30): + + # Fetch the data + text = download_text(url, timeout) + + # Try to load json to check if it's syntaxically correct + try: + loaded_json = json.loads(text) + except ValueError: + raise MoulinetteError(errno.EINVAL, + m18n.g('corrupted_json', ressource=url)) + + return loaded_json diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index c34d6fab..6475f056 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -1,6 +1,9 @@ +import errno import time import subprocess +from moulinette.core import MoulinetteError + # This import is unused in this file. It will be deleted in future (W0611 PEP8), # but for the momment we keep it due to yunohost moulinette script that used # process.quote syntax to access this module ! @@ -102,8 +105,8 @@ def call_async_output(args, callback, **kwargs): # Call multiple commands ----------------------------------------------- -def check_commands(cmds, raise_on_error=False, callback=None, - separate_stderr=False, shell=True, **kwargs): +def run_commands(cmds, callback=None, separate_stderr=False, shell=True, + **kwargs): """Run multiple commands with error management Run a list of commands and allow to manage how to treat errors either @@ -127,9 +130,9 @@ def check_commands(cmds, raise_on_error=False, callback=None, Keyword arguments: - cmds -- List of commands to run - - raise_on_error -- True to raise a CalledProcessError on error if - no callback is provided - - callback -- Method or object to call on command failure + - callback -- Method or object to call on command failure. If no + callback is given, a "subprocess.CalledProcessError" + will be raised in case of command failure. - separate_stderr -- True to return command output as a 2-tuple - **kwargs -- Additional arguments for the Popen constructor @@ -137,20 +140,19 @@ def check_commands(cmds, raise_on_error=False, callback=None, Number of failed commands """ + + # stdout and stderr are specified by this code later, so they cannot be + # overriden by user input for a in ['stdout', 'stderr']: if a in kwargs: raise ValueError('%s argument not allowed, ' 'it will be overridden.' % a) - error = 0 + # If no callback specified... if callback is None: - if raise_on_error: - # Raise on command failure - def callback(r, c, o): - raise CalledProcessError(r, c, o) - else: - # Continue commands execution - callback = lambda r, c, o: True + # Raise CalledProcessError on command failure + def callback(r, c, o): + raise CalledProcessError(r, c, o) elif not callable(callback): raise ValueError('callback argument must be callable') @@ -163,9 +165,12 @@ def check_commands(cmds, raise_on_error=False, callback=None, _get_output = lambda o, e: o # Iterate over commands + error = 0 for cmd in cmds: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=_stderr, shell=shell, **kwargs) + output = _get_output(*process.communicate()) retcode = process.poll() if retcode: diff --git a/moulinette/utils/tests/conftest.py b/moulinette/utils/tests/conftest.py new file mode 100644 index 00000000..2bc5b2b5 --- /dev/null +++ b/moulinette/utils/tests/conftest.py @@ -0,0 +1,96 @@ +import sys +import moulinette + +sys.path.append("..") + +############################################################################### +# Tweak moulinette init to have yunohost namespace # +############################################################################### + + +old_init = moulinette.core.Moulinette18n.__init__ +def monkey_path_i18n_init(self, package, default_locale="en"): + old_init(self, package, default_locale) + self.load_namespace("moulinette") +moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init + + +############################################################################### +# Tweak translator to raise exceptions if string keys are not defined # +############################################################################### + + +old_translate = moulinette.core.Translator.translate +def new_translate(self, key, *args, **kwargs): + + if key not in self._translations[self.default_locale].keys(): + raise KeyError("Unable to retrieve key %s for default locale !" % key) + + return old_translate(self, key, *args, **kwargs) +moulinette.core.Translator.translate = new_translate + +def new_m18nn(self, key, *args, **kwargs): + return self._global.translate(key, *args, **kwargs) +moulinette.core.Moulinette18n.g = new_m18nn + + +############################################################################### +# Init the moulinette to have the cli loggers stuff # +############################################################################### + + +def pytest_cmdline_main(config): + """Configure logging and initialize the moulinette""" + # Define loggers handlers + handlers = set(['tty']) + root_handlers = set(handlers) + + # Define loggers level + level = 'INFO' + tty_level = 'SUCCESS' + + # Custom logging configuration + logging = { + 'version': 1, + 'disable_existing_loggers': True, + 'formatters': { + 'tty-debug': { + 'format': '%(relativeCreated)-4d %(fmessage)s' + }, + 'precise': { + 'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s' + }, + }, + 'filters': { + 'action': { + '()': 'moulinette.utils.log.ActionFilter', + }, + }, + 'handlers': { + 'tty': { + 'level': tty_level, + 'class': 'moulinette.interfaces.cli.TTYHandler', + 'formatter': '', + }, + }, + 'loggers': { + 'moulinette': { + 'level': level, + 'handlers': [], + 'propagate': True, + }, + 'moulinette.interface': { + 'level': level, + 'handlers': handlers, + 'propagate': False, + }, + }, + 'root': { + 'level': level, + 'handlers': root_handlers, + }, + } + + # Initialize moulinette + moulinette.init(logging_config=logging, _from_source=False) + diff --git a/moulinette/utils/tests/test_filesystem.py b/moulinette/utils/tests/test_filesystem.py new file mode 100644 index 00000000..c79bee40 --- /dev/null +++ b/moulinette/utils/tests/test_filesystem.py @@ -0,0 +1,299 @@ + +# General python lib +import os +import pwd +import pytest +import requests +import requests_mock + +# Moulinette specific +from moulinette.core import MoulinetteError +from moulinette.utils.filesystem import ( + read_file, read_json, + write_to_file, append_to_file, write_to_json, + rm, + chmod, chown) + +# We define a dummy context with test folders and files + +TEST_URL = "https://some.test.url/yolo.txt" +TMP_TEST_DIR = "/tmp/test_iohelpers" +TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR +TMP_TEST_JSON = "%s/barjson" % TMP_TEST_DIR +NON_ROOT_USER = "admin" +NON_ROOT_GROUP = "mail" + + +def setup_function(function): + + os.system("rm -rf %s" % TMP_TEST_DIR) + os.system("mkdir %s" % TMP_TEST_DIR) + os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE) + os.system("echo '{ \"foo\":\"bar\" }' > %s" % TMP_TEST_JSON) + os.system("chmod 700 %s" % TMP_TEST_FILE) + os.system("chmod 700 %s" % TMP_TEST_JSON) + + +def teardown_function(function): + + os.seteuid(0) + os.system("rm -rf /tmp/test_iohelpers/") + + +# Helper to try stuff as non-root +def switch_to_non_root_user(): + + nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid + os.seteuid(nonrootuser) + + +############################################################################### +# Test file read # +############################################################################### + + +def test_read_file(): + + content = read_file(TMP_TEST_FILE) + assert content == "foo\nbar\n" + + +def test_read_file_badfile(): + + with pytest.raises(MoulinetteError): + read_file(TMP_TEST_FILE+"nope") + + +def test_read_file_badpermissions(): + + switch_to_non_root_user() + with pytest.raises(MoulinetteError): + read_file(TMP_TEST_FILE) + + +def test_read_json(): + + content = read_json(TMP_TEST_JSON) + assert "foo" in content.keys() + assert content["foo"] == "bar" + + +def test_read_json_badjson(): + + os.system("echo '{ not valid json lol }' > %s" % TMP_TEST_JSON) + + with pytest.raises(MoulinetteError): + read_json(TMP_TEST_JSON) + + +############################################################################### +# Test file write # +############################################################################### + + +def test_write_to_existing_file(): + + assert os.path.exists(TMP_TEST_FILE) + write_to_file(TMP_TEST_FILE, "yolo\nswag") + assert read_file(TMP_TEST_FILE) == "yolo\nswag" + + +def test_write_to_new_file(): + + new_file = "%s/barfile" % TMP_TEST_DIR + assert not os.path.exists(new_file) + write_to_file(new_file, "yolo\nswag") + assert os.path.exists(new_file) + assert read_file(new_file) == "yolo\nswag" + + +def test_write_to_existing_file_badpermissions(): + + assert os.path.exists(TMP_TEST_FILE) + switch_to_non_root_user() + with pytest.raises(MoulinetteError): + write_to_file(TMP_TEST_FILE, "yolo\nswag") + + +def test_write_to_new_file_badpermissions(): + + switch_to_non_root_user() + new_file = "%s/barfile" % TMP_TEST_DIR + assert not os.path.exists(new_file) + with pytest.raises(MoulinetteError): + write_to_file(new_file, "yolo\nswag") + + +def test_write_to_folder(): + + with pytest.raises(AssertionError): + write_to_file(TMP_TEST_DIR, "yolo\nswag") + + +def test_write_inside_nonexistent_folder(): + + with pytest.raises(AssertionError): + write_to_file("/toto/test", "yolo\nswag") + + +def test_write_to_file_with_a_list(): + + assert os.path.exists(TMP_TEST_FILE) + write_to_file(TMP_TEST_FILE, ["yolo", "swag"]) + assert read_file(TMP_TEST_FILE) == "yolo\nswag" + + +def test_append_to_existing_file(): + + assert os.path.exists(TMP_TEST_FILE) + append_to_file(TMP_TEST_FILE, "yolo\nswag") + assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag" + + +def test_append_to_new_file(): + + new_file = "%s/barfile" % TMP_TEST_DIR + assert not os.path.exists(new_file) + append_to_file(new_file, "yolo\nswag") + assert os.path.exists(new_file) + assert read_file(new_file) == "yolo\nswag" + + +def text_write_dict_to_json(): + + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + write_to_json(TMP_TEST_FILE, dummy_dict) + j = read_json(TMP_TEST_FILE) + assert "foo" in j.keys() + assert "bar" in j.keys() + assert j["foo"] == 42 + assert j["bar"] == ["a", "b", "c"] + assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag" + + +def text_write_list_to_json(): + + dummy_list = ["foo", "bar", "baz"] + write_to_json(TMP_TEST_FILE, dummy_list) + j = read_json(TMP_TEST_FILE) + assert j == ["foo", "bar", "baz"] + + +def test_write_to_json_badpermissions(): + + switch_to_non_root_user() + dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]} + with pytest.raises(MoulinetteError): + write_to_json(TMP_TEST_FILE, dummy_dict) + + +def test_write_json_inside_nonexistent_folder(): + + with pytest.raises(AssertionError): + write_to_file("/toto/test.json", ["a", "b"]) + + +############################################################################### +# Test file remove # +############################################################################### + + +def test_remove_file(): + + rm(TMP_TEST_FILE) + assert not os.path.exists(TMP_TEST_FILE) + + +def test_remove_file_badpermissions(): + + switch_to_non_root_user() + with pytest.raises(MoulinetteError): + rm(TMP_TEST_FILE) + + +def test_remove_directory(): + + rm(TMP_TEST_DIR, recursive=True) + assert not os.path.exists(TMP_TEST_DIR) + + +############################################################################### +# Test permission change # +############################################################################### + + +def get_permissions(file_path): + from stat import ST_MODE + return (pwd.getpwuid(os.stat(file_path).st_uid).pw_name, + pwd.getpwuid(os.stat(file_path).st_gid).pw_name, + oct(os.stat(file_path)[ST_MODE])[-3:]) + + +# FIXME - should split the test of chown / chmod as independent tests +def set_permissions(f, owner, group, perms): + chown(f, owner, group) + chmod(f, perms) + + +def test_setpermissions_file(): + + # Check we're at the default permissions + assert get_permissions(TMP_TEST_FILE) == ("root", "root", "700") + + # Change the permissions + set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111) + + # Check the permissions got changed + assert get_permissions(TMP_TEST_FILE) == (NON_ROOT_USER, NON_ROOT_GROUP, "111") + + # Change the permissions again + set_permissions(TMP_TEST_FILE, "root", "root", 0777) + + # Check the permissions got changed + assert get_permissions(TMP_TEST_FILE) == ("root", "root", "777") + + +def test_setpermissions_directory(): + + # Check we're at the default permissions + assert get_permissions(TMP_TEST_DIR) == ("root", "root", "755") + + # Change the permissions + set_permissions(TMP_TEST_DIR, NON_ROOT_USER, NON_ROOT_GROUP, 0111) + + # Check the permissions got changed + assert get_permissions(TMP_TEST_DIR) == (NON_ROOT_USER, NON_ROOT_GROUP, "111") + + # Change the permissions again + set_permissions(TMP_TEST_DIR, "root", "root", 0777) + + # Check the permissions got changed + assert get_permissions(TMP_TEST_DIR) == ("root", "root", "777") + + +def test_setpermissions_permissiondenied(): + + switch_to_non_root_user() + + with pytest.raises(MoulinetteError): + set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111) + + +def test_setpermissions_badfile(): + + with pytest.raises(MoulinetteError): + set_permissions("/foo/bar/yolo", NON_ROOT_USER, NON_ROOT_GROUP, 0111) + + +def test_setpermissions_baduser(): + + with pytest.raises(MoulinetteError): + set_permissions(TMP_TEST_FILE, "foo", NON_ROOT_GROUP, 0111) + + +def test_setpermissions_badgroup(): + + with pytest.raises(MoulinetteError): + set_permissions(TMP_TEST_FILE, NON_ROOT_USER, "foo", 0111) + + diff --git a/moulinette/utils/tests/test_network.py b/moulinette/utils/tests/test_network.py new file mode 100644 index 00000000..588f7a3c --- /dev/null +++ b/moulinette/utils/tests/test_network.py @@ -0,0 +1,92 @@ + +# General python lib +import os +import pwd +import pytest +import requests +import requests_mock + +# Moulinette specific +from moulinette.core import MoulinetteError +from moulinette.utils.network import download_text, download_json + +# We define a dummy context with test folders and files + +TEST_URL = "https://some.test.url/yolo.txt" + +def setup_function(function): + + pass + +def teardown_function(function): + + pass + +############################################################################### +# Test download # +############################################################################### + + +def test_download(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, text='some text') + + fetched_text = download_text(TEST_URL) + + assert fetched_text == "some text" + + +def test_download_badurl(): + + with pytest.raises(MoulinetteError): + download_text(TEST_URL) + + +def test_download_404(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, status_code=404) + + with pytest.raises(MoulinetteError): + download_text(TEST_URL) + + +def test_download_sslerror(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, exc=requests.exceptions.SSLError) + + with pytest.raises(MoulinetteError): + download_text(TEST_URL) + + +def test_download_timeout(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, exc=requests.exceptions.ConnectTimeout) + + with pytest.raises(MoulinetteError): + download_text(TEST_URL) + + +def test_download_json(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, text='{ "foo":"bar" }') + + fetched_json = download_json(TEST_URL) + + assert "foo" in fetched_json.keys() + assert fetched_json["foo"] == "bar" + + +def test_download_json_badjson(): + + with requests_mock.Mocker() as m: + m.register_uri("GET", TEST_URL, text='{ not json lol }') + + with pytest.raises(MoulinetteError): + download_json(TEST_URL) + + diff --git a/moulinette/utils/tests/test_process.py b/moulinette/utils/tests/test_process.py new file mode 100644 index 00000000..a37a0f3b --- /dev/null +++ b/moulinette/utils/tests/test_process.py @@ -0,0 +1,66 @@ +# General python lib +import os +import pwd +import pytest + +# Moulinette specific +from subprocess import CalledProcessError +from moulinette.utils.process import run_commands + +# We define a dummy context with test folders and files + +TMP_TEST_DIR = "/tmp/test_iohelpers" +TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR +NON_ROOT_USER = "admin" +NON_ROOT_GROUP = "mail" + + +def setup_function(function): + + os.system("rm -rf %s" % TMP_TEST_DIR) + os.system("mkdir %s" % TMP_TEST_DIR) + os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE) + os.system("chmod 700 %s" % TMP_TEST_FILE) + + +def teardown_function(function): + + os.seteuid(0) + os.system("rm -rf /tmp/test_iohelpers/") + + +# Helper to try stuff as non-root +def switch_to_non_root_user(): + + nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid + os.seteuid(nonrootuser) + +############################################################################### +# Test run shell commands # +############################################################################### + +def test_run_shell_command_list(): + + commands = ["rm -f %s" % TMP_TEST_FILE] + + assert os.path.exists(TMP_TEST_FILE) + run_commands(commands) + assert not os.path.exists(TMP_TEST_FILE) + + +def test_run_shell_badcommand(): + + commands = ["yolo swag"] + + with pytest.raises(CalledProcessError): + run_commands(commands) + + +def test_run_shell_command_badpermissions(): + + commands = ["rm -f %s" % TMP_TEST_FILE] + + switch_to_non_root_user() + with pytest.raises(CalledProcessError): + run_commands(commands) +