[enh] More helpers for common IO operations (#141)

This commit is contained in:
Alexandre Aubin 2017-07-21 21:05:46 -04:00
parent 92e970af98
commit c9967372f1
8 changed files with 806 additions and 34 deletions

View file

@ -36,5 +36,18 @@
"unknown_user": "Unknown '{user}' user", "unknown_user": "Unknown '{user}' user",
"values_mismatch": "Values don't match", "values_mismatch": "Values don't match",
"warning": "Warning:", "warning": "Warning:",
"websocket_request_expected": "Expected a WebSocket request" "websocket_request_expected": "Expected a WebSocket request",
"cannot_open_file": "Could not open file {file:s} (reason: {error:s})",
"cannot_write_file": "Could not write file {file:s} (reason: {error:s})",
"unknown_error_reading_file": "Unknown error while trying to read file {file:s}",
"corrupted_json": "Corrupted json read from {ressource:s} (reason: {error:s})",
"error_writing_file": "Error when writing file {file:s}: {error:s}",
"error_removing": "Error when removing {path:s}: {error:s}",
"error_changing_file_permissions": "Error when changing permissions for {path:s}: {error:s}",
"invalid_url": "Invalid url {url:s} (does this site exists ?)",
"download_ssl_error": "SSL error when connecting to {url:s}",
"download_timeout": "{url:s} took too long to answer, gave up.",
"download_unknown_error": "Error when downloading data from {url:s} : {error:s}",
"download_bad_status_code": "{url:s} returned status code {code:s}",
"command_unknown": "Command '{command:s}' unknown ?"
} }

View file

@ -1,13 +1,140 @@
import os import os
import errno import errno
import shutil import shutil
import json
import grp
from pwd import getpwnam from pwd import getpwnam
from moulinette.core import MoulinetteError from moulinette.core import MoulinetteError
# Files & directories -------------------------------------------------- # Files & directories --------------------------------------------------
def read_file(file_path):
"""
Read a regular text file
Keyword argument:
file_path -- Path to the text file
"""
assert isinstance(file_path, basestring)
# Check file exists
if not os.path.isfile(file_path):
raise MoulinetteError(errno.ENOENT,
m18n.g('file_not_exist', path=file_path))
# Open file and read content
try:
with open(file_path, "r") as f:
file_content = f.read()
except IOError as e:
raise MoulinetteError(errno.EACCES,
m18n.g('cannot_open_file',
file=file_path, error=str(e)))
except Exception as e:
raise MoulinetteError(errno.EIO,
m18n.g('error_reading_file',
file=file_path, error=str(e)))
return file_content
def read_json(file_path):
"""
Read a json file
Keyword argument:
file_path -- Path to the json file
"""
# Read file
file_content = read_file(file_path)
# Try to load json to check if it's syntaxically correct
try:
loaded_json = json.loads(file_content)
except ValueError as e:
raise MoulinetteError(errno.EINVAL,
m18n.g('corrupted_json',
ressource=file_path, error=str(e)))
return loaded_json
def write_to_file(file_path, data, file_mode="w"):
"""
Write a single string or a list of string to a text file.
The text file will be overwritten by default.
Keyword argument:
file_path -- Path to the output file
data -- The data to write (must be a string or list of string)
file_mode -- Mode used when writing the file. Option meant to be used
by append_to_file to avoid duplicating the code of this function.
"""
assert isinstance(data, basestring) or isinstance(data, list)
assert not os.path.isdir(file_path)
assert os.path.isdir(os.path.dirname(file_path))
# If data is a list, check elements are strings and build a single string
if not isinstance(data, basestring):
for element in data:
assert isinstance(element, basestring)
data = '\n'.join(data)
try:
with open(file_path, file_mode) as f:
f.write(data)
except IOError as e:
raise MoulinetteError(errno.EACCES,
m18n.g('cannot_write_file',
file=file_path, error=str(e)))
except Exception as e:
raise MoulinetteError(errno.EIO,
m18n.g('error_writing_file',
file=file_path, error=str(e)))
def append_to_file(file_path, data):
"""
Append a single string or a list of string to a text file.
Keyword argument:
file_path -- Path to the output file
data -- The data to write (must be a string or list of string)
"""
write_to_file(file_path, data, file_mode="a")
def write_to_json(file_path, data):
"""
Write a dictionnary or a list to a json file
Keyword argument:
file_path -- Path to the output json file
data -- The data to write (must be a dict or a list)
"""
# Assumptions
assert isinstance(file_path, basestring)
assert isinstance(data, dict) or isinstance(data, list)
assert not os.path.isdir(file_path)
assert os.path.isdir(os.path.dirname(file_path))
# Write dict to file
try:
with open(file_path, "w") as f:
json.dump(data, f)
except IOError as e:
raise MoulinetteError(errno.EACCES,
m18n.g('cannot_write_file',
file=file_path, error=str(e)))
except Exception as e:
raise MoulinetteError(errno.EIO,
m18n.g('_error_writing_file',
file=file_path, error=str(e)))
def mkdir(path, mode=0777, parents=False, uid=None, gid=None, force=False): def mkdir(path, mode=0777, parents=False, uid=None, gid=None, force=False):
"""Create a directory with optional features """Create a directory with optional features
@ -70,13 +197,14 @@ def chown(path, uid=None, gid=None, recursive=False):
uid = -1 uid = -1
if isinstance(gid, basestring): if isinstance(gid, basestring):
try: try:
gid = getpwnam(gid).gr_gid gid = grp.getgrnam(gid).gr_gid
except KeyError: except KeyError:
raise MoulinetteError(errno.EINVAL, raise MoulinetteError(errno.EINVAL,
m18n.g('unknown_group', group=gid)) m18n.g('unknown_group', group=gid))
elif gid is None: elif gid is None:
gid = -1 gid = -1
try:
os.chown(path, uid, gid) os.chown(path, uid, gid)
if recursive and os.path.isdir(path): if recursive and os.path.isdir(path):
for root, dirs, files in os.walk(path): for root, dirs, files in os.walk(path):
@ -84,6 +212,10 @@ def chown(path, uid=None, gid=None, recursive=False):
os.chown(os.path.join(root, d), uid, gid) os.chown(os.path.join(root, d), uid, gid)
for f in files: for f in files:
os.chown(os.path.join(root, f), uid, gid) os.chown(os.path.join(root, f), uid, gid)
except Exception as e:
raise MoulinetteError(errno.EIO,
m18n.g('error_changing_file_permissions',
path=path, error=str(e)))
def chmod(path, mode, fmode=None, recursive=False): def chmod(path, mode, fmode=None, recursive=False):
@ -95,6 +227,8 @@ def chmod(path, mode, fmode=None, recursive=False):
- recursive -- Operate on path recursively - recursive -- Operate on path recursively
""" """
try:
os.chmod(path, mode) os.chmod(path, mode)
if recursive and os.path.isdir(path): if recursive and os.path.isdir(path):
if fmode is None: if fmode is None:
@ -104,6 +238,10 @@ def chmod(path, mode, fmode=None, recursive=False):
os.chmod(os.path.join(root, d), mode) os.chmod(os.path.join(root, d), mode)
for f in files: for f in files:
os.chmod(os.path.join(root, f), fmode) os.chmod(os.path.join(root, f), fmode)
except Exception as e:
raise MoulinetteError(errno.EIO,
m18n.g('error_changing_file_permissions',
path=path, error=str(e)))
def rm(path, recursive=False, force=False): def rm(path, recursive=False, force=False):
@ -120,6 +258,8 @@ def rm(path, recursive=False, force=False):
else: else:
try: try:
os.remove(path) os.remove(path)
except OSError: except OSError as e:
if not force: if not force:
raise raise MoulinetteError(errno.EIO,
m18n.g('error_removing',
path=path, error=str(e)))

View file

@ -0,0 +1,61 @@
import errno
import requests
import json
from moulinette.core import MoulinetteError
def download_text(url, timeout=30):
"""
Download text from a url and returns the raw text
Keyword argument:
url -- The url to download the data from
timeout -- Number of seconds allowed for download to effectively start
before giving up
"""
# Assumptions
assert isinstance(url, str)
# Download file
try:
r = requests.get(url, timeout=timeout)
# Invalid URL
except requests.exceptions.ConnectionError:
raise MoulinetteError(errno.EBADE,
m18n.g('invalid_url', url=url))
# SSL exceptions
except requests.exceptions.SSLError:
raise MoulinetteError(errno.EBADE,
m18n.g('download_ssl_error', url=url))
# Timeout exceptions
except requests.exceptions.Timeout:
raise MoulinetteError(errno.ETIME,
m18n.g('download_timeout', url=url))
# Unknown stuff
except Exception as e:
raise MoulinetteError(errno.ECONNRESET,
m18n.g('download_unknown_error',
url=url, error=str(e)))
# Assume error if status code is not 200 (OK)
if r.status_code != 200:
raise MoulinetteError(errno.EBADE,
m18n.g('download_bad_status_code',
url=url, code=str(r.status_code)))
return r.text
def download_json(url, timeout=30):
# Fetch the data
text = download_text(url, timeout)
# Try to load json to check if it's syntaxically correct
try:
loaded_json = json.loads(text)
except ValueError:
raise MoulinetteError(errno.EINVAL,
m18n.g('corrupted_json', ressource=url))
return loaded_json

View file

@ -1,6 +1,9 @@
import errno
import time import time
import subprocess import subprocess
from moulinette.core import MoulinetteError
# This import is unused in this file. It will be deleted in future (W0611 PEP8), # This import is unused in this file. It will be deleted in future (W0611 PEP8),
# but for the momment we keep it due to yunohost moulinette script that used # but for the momment we keep it due to yunohost moulinette script that used
# process.quote syntax to access this module ! # process.quote syntax to access this module !
@ -102,8 +105,8 @@ def call_async_output(args, callback, **kwargs):
# Call multiple commands ----------------------------------------------- # Call multiple commands -----------------------------------------------
def check_commands(cmds, raise_on_error=False, callback=None, def run_commands(cmds, callback=None, separate_stderr=False, shell=True,
separate_stderr=False, shell=True, **kwargs): **kwargs):
"""Run multiple commands with error management """Run multiple commands with error management
Run a list of commands and allow to manage how to treat errors either Run a list of commands and allow to manage how to treat errors either
@ -127,9 +130,9 @@ def check_commands(cmds, raise_on_error=False, callback=None,
Keyword arguments: Keyword arguments:
- cmds -- List of commands to run - cmds -- List of commands to run
- raise_on_error -- True to raise a CalledProcessError on error if - callback -- Method or object to call on command failure. If no
no callback is provided callback is given, a "subprocess.CalledProcessError"
- callback -- Method or object to call on command failure will be raised in case of command failure.
- separate_stderr -- True to return command output as a 2-tuple - separate_stderr -- True to return command output as a 2-tuple
- **kwargs -- Additional arguments for the Popen constructor - **kwargs -- Additional arguments for the Popen constructor
@ -137,20 +140,19 @@ def check_commands(cmds, raise_on_error=False, callback=None,
Number of failed commands Number of failed commands
""" """
# stdout and stderr are specified by this code later, so they cannot be
# overriden by user input
for a in ['stdout', 'stderr']: for a in ['stdout', 'stderr']:
if a in kwargs: if a in kwargs:
raise ValueError('%s argument not allowed, ' raise ValueError('%s argument not allowed, '
'it will be overridden.' % a) 'it will be overridden.' % a)
error = 0
# If no callback specified...
if callback is None: if callback is None:
if raise_on_error: # Raise CalledProcessError on command failure
# Raise on command failure
def callback(r, c, o): def callback(r, c, o):
raise CalledProcessError(r, c, o) raise CalledProcessError(r, c, o)
else:
# Continue commands execution
callback = lambda r, c, o: True
elif not callable(callback): elif not callable(callback):
raise ValueError('callback argument must be callable') raise ValueError('callback argument must be callable')
@ -163,9 +165,12 @@ def check_commands(cmds, raise_on_error=False, callback=None,
_get_output = lambda o, e: o _get_output = lambda o, e: o
# Iterate over commands # Iterate over commands
error = 0
for cmd in cmds: for cmd in cmds:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=_stderr, shell=shell, **kwargs) stderr=_stderr, shell=shell, **kwargs)
output = _get_output(*process.communicate()) output = _get_output(*process.communicate())
retcode = process.poll() retcode = process.poll()
if retcode: if retcode:

View file

@ -0,0 +1,96 @@
import sys
import moulinette
sys.path.append("..")
###############################################################################
# Tweak moulinette init to have yunohost namespace #
###############################################################################
old_init = moulinette.core.Moulinette18n.__init__
def monkey_path_i18n_init(self, package, default_locale="en"):
old_init(self, package, default_locale)
self.load_namespace("moulinette")
moulinette.core.Moulinette18n.__init__ = monkey_path_i18n_init
###############################################################################
# Tweak translator to raise exceptions if string keys are not defined #
###############################################################################
old_translate = moulinette.core.Translator.translate
def new_translate(self, key, *args, **kwargs):
if key not in self._translations[self.default_locale].keys():
raise KeyError("Unable to retrieve key %s for default locale !" % key)
return old_translate(self, key, *args, **kwargs)
moulinette.core.Translator.translate = new_translate
def new_m18nn(self, key, *args, **kwargs):
return self._global.translate(key, *args, **kwargs)
moulinette.core.Moulinette18n.g = new_m18nn
###############################################################################
# Init the moulinette to have the cli loggers stuff #
###############################################################################
def pytest_cmdline_main(config):
"""Configure logging and initialize the moulinette"""
# Define loggers handlers
handlers = set(['tty'])
root_handlers = set(handlers)
# Define loggers level
level = 'INFO'
tty_level = 'SUCCESS'
# Custom logging configuration
logging = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'tty-debug': {
'format': '%(relativeCreated)-4d %(fmessage)s'
},
'precise': {
'format': '%(asctime)-15s %(levelname)-8s %(name)s %(funcName)s - %(fmessage)s'
},
},
'filters': {
'action': {
'()': 'moulinette.utils.log.ActionFilter',
},
},
'handlers': {
'tty': {
'level': tty_level,
'class': 'moulinette.interfaces.cli.TTYHandler',
'formatter': '',
},
},
'loggers': {
'moulinette': {
'level': level,
'handlers': [],
'propagate': True,
},
'moulinette.interface': {
'level': level,
'handlers': handlers,
'propagate': False,
},
},
'root': {
'level': level,
'handlers': root_handlers,
},
}
# Initialize moulinette
moulinette.init(logging_config=logging, _from_source=False)

View file

@ -0,0 +1,299 @@
# General python lib
import os
import pwd
import pytest
import requests
import requests_mock
# Moulinette specific
from moulinette.core import MoulinetteError
from moulinette.utils.filesystem import (
read_file, read_json,
write_to_file, append_to_file, write_to_json,
rm,
chmod, chown)
# We define a dummy context with test folders and files
TEST_URL = "https://some.test.url/yolo.txt"
TMP_TEST_DIR = "/tmp/test_iohelpers"
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
TMP_TEST_JSON = "%s/barjson" % TMP_TEST_DIR
NON_ROOT_USER = "admin"
NON_ROOT_GROUP = "mail"
def setup_function(function):
os.system("rm -rf %s" % TMP_TEST_DIR)
os.system("mkdir %s" % TMP_TEST_DIR)
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
os.system("echo '{ \"foo\":\"bar\" }' > %s" % TMP_TEST_JSON)
os.system("chmod 700 %s" % TMP_TEST_FILE)
os.system("chmod 700 %s" % TMP_TEST_JSON)
def teardown_function(function):
os.seteuid(0)
os.system("rm -rf /tmp/test_iohelpers/")
# Helper to try stuff as non-root
def switch_to_non_root_user():
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
os.seteuid(nonrootuser)
###############################################################################
# Test file read #
###############################################################################
def test_read_file():
content = read_file(TMP_TEST_FILE)
assert content == "foo\nbar\n"
def test_read_file_badfile():
with pytest.raises(MoulinetteError):
read_file(TMP_TEST_FILE+"nope")
def test_read_file_badpermissions():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
read_file(TMP_TEST_FILE)
def test_read_json():
content = read_json(TMP_TEST_JSON)
assert "foo" in content.keys()
assert content["foo"] == "bar"
def test_read_json_badjson():
os.system("echo '{ not valid json lol }' > %s" % TMP_TEST_JSON)
with pytest.raises(MoulinetteError):
read_json(TMP_TEST_JSON)
###############################################################################
# Test file write #
###############################################################################
def test_write_to_existing_file():
assert os.path.exists(TMP_TEST_FILE)
write_to_file(TMP_TEST_FILE, "yolo\nswag")
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
def test_write_to_new_file():
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
write_to_file(new_file, "yolo\nswag")
assert os.path.exists(new_file)
assert read_file(new_file) == "yolo\nswag"
def test_write_to_existing_file_badpermissions():
assert os.path.exists(TMP_TEST_FILE)
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
write_to_file(TMP_TEST_FILE, "yolo\nswag")
def test_write_to_new_file_badpermissions():
switch_to_non_root_user()
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
with pytest.raises(MoulinetteError):
write_to_file(new_file, "yolo\nswag")
def test_write_to_folder():
with pytest.raises(AssertionError):
write_to_file(TMP_TEST_DIR, "yolo\nswag")
def test_write_inside_nonexistent_folder():
with pytest.raises(AssertionError):
write_to_file("/toto/test", "yolo\nswag")
def test_write_to_file_with_a_list():
assert os.path.exists(TMP_TEST_FILE)
write_to_file(TMP_TEST_FILE, ["yolo", "swag"])
assert read_file(TMP_TEST_FILE) == "yolo\nswag"
def test_append_to_existing_file():
assert os.path.exists(TMP_TEST_FILE)
append_to_file(TMP_TEST_FILE, "yolo\nswag")
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
def test_append_to_new_file():
new_file = "%s/barfile" % TMP_TEST_DIR
assert not os.path.exists(new_file)
append_to_file(new_file, "yolo\nswag")
assert os.path.exists(new_file)
assert read_file(new_file) == "yolo\nswag"
def text_write_dict_to_json():
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
write_to_json(TMP_TEST_FILE, dummy_dict)
j = read_json(TMP_TEST_FILE)
assert "foo" in j.keys()
assert "bar" in j.keys()
assert j["foo"] == 42
assert j["bar"] == ["a", "b", "c"]
assert read_file(TMP_TEST_FILE) == "foo\nbar\nyolo\nswag"
def text_write_list_to_json():
dummy_list = ["foo", "bar", "baz"]
write_to_json(TMP_TEST_FILE, dummy_list)
j = read_json(TMP_TEST_FILE)
assert j == ["foo", "bar", "baz"]
def test_write_to_json_badpermissions():
switch_to_non_root_user()
dummy_dict = {"foo": 42, "bar": ["a", "b", "c"]}
with pytest.raises(MoulinetteError):
write_to_json(TMP_TEST_FILE, dummy_dict)
def test_write_json_inside_nonexistent_folder():
with pytest.raises(AssertionError):
write_to_file("/toto/test.json", ["a", "b"])
###############################################################################
# Test file remove #
###############################################################################
def test_remove_file():
rm(TMP_TEST_FILE)
assert not os.path.exists(TMP_TEST_FILE)
def test_remove_file_badpermissions():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
rm(TMP_TEST_FILE)
def test_remove_directory():
rm(TMP_TEST_DIR, recursive=True)
assert not os.path.exists(TMP_TEST_DIR)
###############################################################################
# Test permission change #
###############################################################################
def get_permissions(file_path):
from stat import ST_MODE
return (pwd.getpwuid(os.stat(file_path).st_uid).pw_name,
pwd.getpwuid(os.stat(file_path).st_gid).pw_name,
oct(os.stat(file_path)[ST_MODE])[-3:])
# FIXME - should split the test of chown / chmod as independent tests
def set_permissions(f, owner, group, perms):
chown(f, owner, group)
chmod(f, perms)
def test_setpermissions_file():
# Check we're at the default permissions
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "700")
# Change the permissions
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
# Check the permissions got changed
assert get_permissions(TMP_TEST_FILE) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
# Change the permissions again
set_permissions(TMP_TEST_FILE, "root", "root", 0777)
# Check the permissions got changed
assert get_permissions(TMP_TEST_FILE) == ("root", "root", "777")
def test_setpermissions_directory():
# Check we're at the default permissions
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "755")
# Change the permissions
set_permissions(TMP_TEST_DIR, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
# Check the permissions got changed
assert get_permissions(TMP_TEST_DIR) == (NON_ROOT_USER, NON_ROOT_GROUP, "111")
# Change the permissions again
set_permissions(TMP_TEST_DIR, "root", "root", 0777)
# Check the permissions got changed
assert get_permissions(TMP_TEST_DIR) == ("root", "root", "777")
def test_setpermissions_permissiondenied():
switch_to_non_root_user()
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, NON_ROOT_GROUP, 0111)
def test_setpermissions_badfile():
with pytest.raises(MoulinetteError):
set_permissions("/foo/bar/yolo", NON_ROOT_USER, NON_ROOT_GROUP, 0111)
def test_setpermissions_baduser():
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, "foo", NON_ROOT_GROUP, 0111)
def test_setpermissions_badgroup():
with pytest.raises(MoulinetteError):
set_permissions(TMP_TEST_FILE, NON_ROOT_USER, "foo", 0111)

View file

@ -0,0 +1,92 @@
# General python lib
import os
import pwd
import pytest
import requests
import requests_mock
# Moulinette specific
from moulinette.core import MoulinetteError
from moulinette.utils.network import download_text, download_json
# We define a dummy context with test folders and files
TEST_URL = "https://some.test.url/yolo.txt"
def setup_function(function):
pass
def teardown_function(function):
pass
###############################################################################
# Test download #
###############################################################################
def test_download():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='some text')
fetched_text = download_text(TEST_URL)
assert fetched_text == "some text"
def test_download_badurl():
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_404():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, status_code=404)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_sslerror():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, exc=requests.exceptions.SSLError)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_timeout():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, exc=requests.exceptions.ConnectTimeout)
with pytest.raises(MoulinetteError):
download_text(TEST_URL)
def test_download_json():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='{ "foo":"bar" }')
fetched_json = download_json(TEST_URL)
assert "foo" in fetched_json.keys()
assert fetched_json["foo"] == "bar"
def test_download_json_badjson():
with requests_mock.Mocker() as m:
m.register_uri("GET", TEST_URL, text='{ not json lol }')
with pytest.raises(MoulinetteError):
download_json(TEST_URL)

View file

@ -0,0 +1,66 @@
# General python lib
import os
import pwd
import pytest
# Moulinette specific
from subprocess import CalledProcessError
from moulinette.utils.process import run_commands
# We define a dummy context with test folders and files
TMP_TEST_DIR = "/tmp/test_iohelpers"
TMP_TEST_FILE = "%s/foofile" % TMP_TEST_DIR
NON_ROOT_USER = "admin"
NON_ROOT_GROUP = "mail"
def setup_function(function):
os.system("rm -rf %s" % TMP_TEST_DIR)
os.system("mkdir %s" % TMP_TEST_DIR)
os.system("echo 'foo\nbar' > %s" % TMP_TEST_FILE)
os.system("chmod 700 %s" % TMP_TEST_FILE)
def teardown_function(function):
os.seteuid(0)
os.system("rm -rf /tmp/test_iohelpers/")
# Helper to try stuff as non-root
def switch_to_non_root_user():
nonrootuser = pwd.getpwnam(NON_ROOT_USER).pw_uid
os.seteuid(nonrootuser)
###############################################################################
# Test run shell commands #
###############################################################################
def test_run_shell_command_list():
commands = ["rm -f %s" % TMP_TEST_FILE]
assert os.path.exists(TMP_TEST_FILE)
run_commands(commands)
assert not os.path.exists(TMP_TEST_FILE)
def test_run_shell_badcommand():
commands = ["yolo swag"]
with pytest.raises(CalledProcessError):
run_commands(commands)
def test_run_shell_command_badpermissions():
commands = ["rm -f %s" % TMP_TEST_FILE]
switch_to_non_root_user()
with pytest.raises(CalledProcessError):
run_commands(commands)