diff --git a/src/yunohost/tests/test_user-group.py b/src/yunohost/tests/test_user-group.py index 251029796..e83425df9 100644 --- a/src/yunohost/tests/test_user-group.py +++ b/src/yunohost/tests/test_user-group.py @@ -1,5 +1,6 @@ import pytest +<<<<<<< HEAD from .conftest import message, raiseYunohostError from yunohost.user import ( @@ -8,6 +9,10 @@ from yunohost.user import ( user_create, user_delete, user_update, + user_import, + user_export, + CSV_FIELDNAMES, + FIRST_ALIASES, user_group_list, user_group_create, user_group_delete, @@ -110,6 +115,65 @@ def test_del_user(mocker): assert "alice" not in group_res["all_users"]["members"] +def test_import_user(mocker): + import csv + from io import BytesIO + fieldnames = [u'username', u'firstname', u'lastname', u'password', + u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', + u'groups'] + with BytesIO() as csv_io: + writer = csv.DictWriter(csv_io, fieldnames, delimiter=';', + quotechar='"') + writer.writeheader() + writer.writerow({ + 'username': "albert", + 'firstname': "Albert", + 'lastname': "Good", + 'password': "", + 'mailbox-quota': "1G", + 'mail': "albert@" + maindomain, + 'mail-alias': "albert2@" + maindomain, + 'mail-forward': "albert@example.com", + 'groups': "dev", + }) + writer.writerow({ + 'username': "alice", + 'firstname': "Alice", + 'lastname': "White", + 'password': "", + 'mailbox-quota': "1G", + 'mail': "alice@" + maindomain, + 'mail-alias': "alice1@" + maindomain + ",alice2@" + maindomain, + 'mail-forward': "", + 'groups': "apps", + }) + csv_io.seek(0) + with message(mocker, "user_import_success"): + user_import(csv_io, update=True, delete=True) + + group_res = user_group_list()['groups'] + user_res = user_list(CSV_FIELDNAMES)['users'] + assert "albert" in user_res + assert "alice" in user_res + assert "bob" not in user_res + assert len(user_res['alice']['mail-alias']) == 2 + assert "albert" in group_res['dev']['members'] + assert "alice" in group_res['apps']['members'] + + +def test_export_user(mocker): + result = user_export() + should_be = "username;firstname;lastname;password;" + should_be += "mailbox-quota;mail;mail-alias;mail-forward;groups" + should_be += "\r\nbob;Bob;Snow;;0;bob@" + maindomain + ";;;apps" + should_be += "\r\nalice;Alice;White;;0;alice@" + maindomain + ";" + should_be += ','.join([alias + maindomain for alias in FIRST_ALIASES]) + should_be += ";;dev" + should_be += "\r\njack;Jack;Black;;0;jack@" + maindomain + ";;;" + + assert result == should_be + + def test_create_group(mocker): with message(mocker, "group_created", group="adminsys"): diff --git a/src/yunohost/user.py b/src/yunohost/user.py index 0fae9cf43..0bcce9cbc 100644 --- a/src/yunohost/user.py +++ b/src/yunohost/user.py @@ -43,6 +43,19 @@ from yunohost.log import is_unit_operation logger = getActionLogger("yunohost.user") +CSV_FIELDNAMES = [u'username', u'firstname', u'lastname', u'password', u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', u'groups'] +VALIDATORS = { + 'username': r'^[a-z0-9_]+$', + 'firstname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', #FIXME Merge first and lastname and support more name (arabish, chinese...) + 'lastname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', + 'password': r'^|(.{3,})$', + 'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$', + 'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', + 'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', + 'mailbox-quota': r'^(\d+[bkMGT])|0$', + 'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$' +} +FIRST_ALIASES = ['root@', 'admin@', 'webmaster@', 'postmaster@', 'abuse@'] def user_list(fields=None): @@ -87,7 +100,7 @@ def user_list(fields=None): for field in fields: if field in ldap_attrs: - attrs|=set([ldap_attrs[field]]) + attrs |= set([ldap_attrs[field]]) else: raise YunohostError('field_invalid', field) @@ -179,13 +192,7 @@ def user_create( raise YunohostValidationError("system_username_exists") main_domain = _get_maindomain() - aliases = [ - "root@" + main_domain, - "admin@" + main_domain, - "webmaster@" + main_domain, - "postmaster@" + main_domain, - "abuse@" + main_domain, - ] + aliases = [alias + main_domain for alias in FIRST_ALIASES] if mail in aliases: raise YunohostValidationError("mail_unavailable") @@ -416,13 +423,8 @@ def user_update( if mail: main_domain = _get_maindomain() - aliases = [ - "root@" + main_domain, - "admin@" + main_domain, - "webmaster@" + main_domain, - "postmaster@" + main_domain, - 'abuse@' + main_domain, - ] + aliases = [alias + main_domain for alias in FIRST_ALIASES] + if mail in user['mail']: user['mail'].remove(mail) else: @@ -606,23 +608,23 @@ def user_export(): Export users into CSV Keyword argument: - csv -- CSV file with columns username;firstname;lastname;password;mailbox_quota;mail;alias;forward;groups + csv -- CSV file with columns username;firstname;lastname;password;mailbox-quota;mail;mail-alias;mail-forward;groups """ - import csv # CSV are needed only in this function + import csv # CSV are needed only in this function from io import BytesIO - fieldnames = [u'username', u'firstname', u'lastname', u'password', u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', u'groups'] with BytesIO() as csv_io: - writer = csv.DictWriter(csv_io, fieldnames, delimiter=';', quotechar='"') + writer = csv.DictWriter(csv_io, CSV_FIELDNAMES, + delimiter=';', quotechar='"') writer.writeheader() - users = user_list(fieldnames)['users'] + users = user_list(CSV_FIELDNAMES)['users'] for username, user in users.items(): user['mail-alias'] = ','.join(user['mail-alias']) user['mail-forward'] = ','.join(user['mail-forward']) user['groups'] = ','.join(user['groups']) writer.writerow(user) - body = csv_io.getvalue() + body = csv_io.getvalue().rstrip() if msettings.get('interface') == 'api': # We return a raw bottle HTTPresponse (instead of serializable data like # list/dict, ...), which is gonna be picked and used directly by moulinette @@ -631,12 +633,12 @@ def user_export(): headers={ "Content-Disposition": "attachment; filename='users.csv'", "Content-Type": "text/csv", - } - ) + }) return response else: return body + @is_unit_operation() def user_import(operation_logger, csvfile, update=False, delete=False): """ @@ -657,17 +659,6 @@ def user_import(operation_logger, csvfile, update=False, delete=False): 'deleted': [] } is_well_formatted = True - validators = { - 'username': r'^[a-z0-9_]+$', - 'firstname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', #FIXME Merge first and lastname and support more name (arabish, chinese...) - 'lastname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', - 'password': r'^|(.{3,})$', - 'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$', - 'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', - 'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', - 'mailbox-quota': r'^(\d+[bkMGT])|0$', - 'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$' - } def to_list(str_list): return str_list.split(',') if str_list else [] @@ -679,7 +670,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False): # Validation try: format_errors = [key + ':' + str(user[key]) - for key, validator in validators.items() + for key, validator in VALIDATORS.items() if user[key] is None or not re.match(validator, user[key])] except KeyError, e: logger.error(m18n.n('user_import_missing_column', @@ -744,10 +735,10 @@ def user_import(operation_logger, csvfile, update=False, delete=False): remove_forward = None if info: user['mail'] = None if info['mail'] == user['mail'] else user['mail'] - remove_alias = list(set(info['mail-aliases']) - set(user['mail-alias'])) + remove_alias = list(set(info['mail-alias']) - set(user['mail-alias'])) remove_forward = list(set(info['mail-forward']) - set(user['mail-forward'])) - user['alias'] = list(set(user['alias']) - set(info['mail-aliases'])) - user['forward'] = list(set(user['forward']) - set(info['mail-forward'])) + user['mail-alias'] = list(set(user['mail-alias']) - set(info['mail-alias'])) + user['mail-forward'] = list(set(user['mail-forward']) - set(info['mail-forward'])) for group, infos in user_group_list()["groups"].items(): if group == "all_users": continue @@ -768,7 +759,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False): for group in user['groups']: user_group_update(group, add=user['username'], sync_perm=False, imported=True) - users = user_list()['users'] + users = user_list(CSV_FIELDNAMES)['users'] operation_logger.start() # We do delete and update before to avoid mail uniqueness issues for user in actions['deleted']: