mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
[fix] Import user with update mode some unit test
This commit is contained in:
parent
9e2f4a56f3
commit
fd06430e8f
2 changed files with 94 additions and 39 deletions
|
@ -1,5 +1,6 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
from .conftest import message, raiseYunohostError
|
from .conftest import message, raiseYunohostError
|
||||||
|
|
||||||
from yunohost.user import (
|
from yunohost.user import (
|
||||||
|
@ -8,6 +9,10 @@ from yunohost.user import (
|
||||||
user_create,
|
user_create,
|
||||||
user_delete,
|
user_delete,
|
||||||
user_update,
|
user_update,
|
||||||
|
user_import,
|
||||||
|
user_export,
|
||||||
|
CSV_FIELDNAMES,
|
||||||
|
FIRST_ALIASES,
|
||||||
user_group_list,
|
user_group_list,
|
||||||
user_group_create,
|
user_group_create,
|
||||||
user_group_delete,
|
user_group_delete,
|
||||||
|
@ -110,6 +115,65 @@ def test_del_user(mocker):
|
||||||
assert "alice" not in group_res["all_users"]["members"]
|
assert "alice" not in group_res["all_users"]["members"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_user(mocker):
|
||||||
|
import csv
|
||||||
|
from io import BytesIO
|
||||||
|
fieldnames = [u'username', u'firstname', u'lastname', u'password',
|
||||||
|
u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward',
|
||||||
|
u'groups']
|
||||||
|
with BytesIO() as csv_io:
|
||||||
|
writer = csv.DictWriter(csv_io, fieldnames, delimiter=';',
|
||||||
|
quotechar='"')
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerow({
|
||||||
|
'username': "albert",
|
||||||
|
'firstname': "Albert",
|
||||||
|
'lastname': "Good",
|
||||||
|
'password': "",
|
||||||
|
'mailbox-quota': "1G",
|
||||||
|
'mail': "albert@" + maindomain,
|
||||||
|
'mail-alias': "albert2@" + maindomain,
|
||||||
|
'mail-forward': "albert@example.com",
|
||||||
|
'groups': "dev",
|
||||||
|
})
|
||||||
|
writer.writerow({
|
||||||
|
'username': "alice",
|
||||||
|
'firstname': "Alice",
|
||||||
|
'lastname': "White",
|
||||||
|
'password': "",
|
||||||
|
'mailbox-quota': "1G",
|
||||||
|
'mail': "alice@" + maindomain,
|
||||||
|
'mail-alias': "alice1@" + maindomain + ",alice2@" + maindomain,
|
||||||
|
'mail-forward': "",
|
||||||
|
'groups': "apps",
|
||||||
|
})
|
||||||
|
csv_io.seek(0)
|
||||||
|
with message(mocker, "user_import_success"):
|
||||||
|
user_import(csv_io, update=True, delete=True)
|
||||||
|
|
||||||
|
group_res = user_group_list()['groups']
|
||||||
|
user_res = user_list(CSV_FIELDNAMES)['users']
|
||||||
|
assert "albert" in user_res
|
||||||
|
assert "alice" in user_res
|
||||||
|
assert "bob" not in user_res
|
||||||
|
assert len(user_res['alice']['mail-alias']) == 2
|
||||||
|
assert "albert" in group_res['dev']['members']
|
||||||
|
assert "alice" in group_res['apps']['members']
|
||||||
|
|
||||||
|
|
||||||
|
def test_export_user(mocker):
|
||||||
|
result = user_export()
|
||||||
|
should_be = "username;firstname;lastname;password;"
|
||||||
|
should_be += "mailbox-quota;mail;mail-alias;mail-forward;groups"
|
||||||
|
should_be += "\r\nbob;Bob;Snow;;0;bob@" + maindomain + ";;;apps"
|
||||||
|
should_be += "\r\nalice;Alice;White;;0;alice@" + maindomain + ";"
|
||||||
|
should_be += ','.join([alias + maindomain for alias in FIRST_ALIASES])
|
||||||
|
should_be += ";;dev"
|
||||||
|
should_be += "\r\njack;Jack;Black;;0;jack@" + maindomain + ";;;"
|
||||||
|
|
||||||
|
assert result == should_be
|
||||||
|
|
||||||
|
|
||||||
def test_create_group(mocker):
|
def test_create_group(mocker):
|
||||||
|
|
||||||
with message(mocker, "group_created", group="adminsys"):
|
with message(mocker, "group_created", group="adminsys"):
|
||||||
|
|
|
@ -43,6 +43,19 @@ from yunohost.log import is_unit_operation
|
||||||
|
|
||||||
logger = getActionLogger("yunohost.user")
|
logger = getActionLogger("yunohost.user")
|
||||||
|
|
||||||
|
CSV_FIELDNAMES = [u'username', u'firstname', u'lastname', u'password', u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', u'groups']
|
||||||
|
VALIDATORS = {
|
||||||
|
'username': r'^[a-z0-9_]+$',
|
||||||
|
'firstname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', #FIXME Merge first and lastname and support more name (arabish, chinese...)
|
||||||
|
'lastname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$',
|
||||||
|
'password': r'^|(.{3,})$',
|
||||||
|
'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$',
|
||||||
|
'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$',
|
||||||
|
'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$',
|
||||||
|
'mailbox-quota': r'^(\d+[bkMGT])|0$',
|
||||||
|
'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$'
|
||||||
|
}
|
||||||
|
FIRST_ALIASES = ['root@', 'admin@', 'webmaster@', 'postmaster@', 'abuse@']
|
||||||
|
|
||||||
def user_list(fields=None):
|
def user_list(fields=None):
|
||||||
|
|
||||||
|
@ -87,7 +100,7 @@ def user_list(fields=None):
|
||||||
|
|
||||||
for field in fields:
|
for field in fields:
|
||||||
if field in ldap_attrs:
|
if field in ldap_attrs:
|
||||||
attrs|=set([ldap_attrs[field]])
|
attrs |= set([ldap_attrs[field]])
|
||||||
else:
|
else:
|
||||||
raise YunohostError('field_invalid', field)
|
raise YunohostError('field_invalid', field)
|
||||||
|
|
||||||
|
@ -179,13 +192,7 @@ def user_create(
|
||||||
raise YunohostValidationError("system_username_exists")
|
raise YunohostValidationError("system_username_exists")
|
||||||
|
|
||||||
main_domain = _get_maindomain()
|
main_domain = _get_maindomain()
|
||||||
aliases = [
|
aliases = [alias + main_domain for alias in FIRST_ALIASES]
|
||||||
"root@" + main_domain,
|
|
||||||
"admin@" + main_domain,
|
|
||||||
"webmaster@" + main_domain,
|
|
||||||
"postmaster@" + main_domain,
|
|
||||||
"abuse@" + main_domain,
|
|
||||||
]
|
|
||||||
|
|
||||||
if mail in aliases:
|
if mail in aliases:
|
||||||
raise YunohostValidationError("mail_unavailable")
|
raise YunohostValidationError("mail_unavailable")
|
||||||
|
@ -416,13 +423,8 @@ def user_update(
|
||||||
|
|
||||||
if mail:
|
if mail:
|
||||||
main_domain = _get_maindomain()
|
main_domain = _get_maindomain()
|
||||||
aliases = [
|
aliases = [alias + main_domain for alias in FIRST_ALIASES]
|
||||||
"root@" + main_domain,
|
|
||||||
"admin@" + main_domain,
|
|
||||||
"webmaster@" + main_domain,
|
|
||||||
"postmaster@" + main_domain,
|
|
||||||
'abuse@' + main_domain,
|
|
||||||
]
|
|
||||||
if mail in user['mail']:
|
if mail in user['mail']:
|
||||||
user['mail'].remove(mail)
|
user['mail'].remove(mail)
|
||||||
else:
|
else:
|
||||||
|
@ -606,23 +608,23 @@ def user_export():
|
||||||
Export users into CSV
|
Export users into CSV
|
||||||
|
|
||||||
Keyword argument:
|
Keyword argument:
|
||||||
csv -- CSV file with columns username;firstname;lastname;password;mailbox_quota;mail;alias;forward;groups
|
csv -- CSV file with columns username;firstname;lastname;password;mailbox-quota;mail;mail-alias;mail-forward;groups
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import csv # CSV are needed only in this function
|
import csv # CSV are needed only in this function
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
fieldnames = [u'username', u'firstname', u'lastname', u'password', u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', u'groups']
|
|
||||||
with BytesIO() as csv_io:
|
with BytesIO() as csv_io:
|
||||||
writer = csv.DictWriter(csv_io, fieldnames, delimiter=';', quotechar='"')
|
writer = csv.DictWriter(csv_io, CSV_FIELDNAMES,
|
||||||
|
delimiter=';', quotechar='"')
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
users = user_list(fieldnames)['users']
|
users = user_list(CSV_FIELDNAMES)['users']
|
||||||
for username, user in users.items():
|
for username, user in users.items():
|
||||||
user['mail-alias'] = ','.join(user['mail-alias'])
|
user['mail-alias'] = ','.join(user['mail-alias'])
|
||||||
user['mail-forward'] = ','.join(user['mail-forward'])
|
user['mail-forward'] = ','.join(user['mail-forward'])
|
||||||
user['groups'] = ','.join(user['groups'])
|
user['groups'] = ','.join(user['groups'])
|
||||||
writer.writerow(user)
|
writer.writerow(user)
|
||||||
|
|
||||||
body = csv_io.getvalue()
|
body = csv_io.getvalue().rstrip()
|
||||||
if msettings.get('interface') == 'api':
|
if msettings.get('interface') == 'api':
|
||||||
# We return a raw bottle HTTPresponse (instead of serializable data like
|
# We return a raw bottle HTTPresponse (instead of serializable data like
|
||||||
# list/dict, ...), which is gonna be picked and used directly by moulinette
|
# list/dict, ...), which is gonna be picked and used directly by moulinette
|
||||||
|
@ -631,12 +633,12 @@ def user_export():
|
||||||
headers={
|
headers={
|
||||||
"Content-Disposition": "attachment; filename='users.csv'",
|
"Content-Disposition": "attachment; filename='users.csv'",
|
||||||
"Content-Type": "text/csv",
|
"Content-Type": "text/csv",
|
||||||
}
|
})
|
||||||
)
|
|
||||||
return response
|
return response
|
||||||
else:
|
else:
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
||||||
@is_unit_operation()
|
@is_unit_operation()
|
||||||
def user_import(operation_logger, csvfile, update=False, delete=False):
|
def user_import(operation_logger, csvfile, update=False, delete=False):
|
||||||
"""
|
"""
|
||||||
|
@ -657,17 +659,6 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
|
||||||
'deleted': []
|
'deleted': []
|
||||||
}
|
}
|
||||||
is_well_formatted = True
|
is_well_formatted = True
|
||||||
validators = {
|
|
||||||
'username': r'^[a-z0-9_]+$',
|
|
||||||
'firstname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$', #FIXME Merge first and lastname and support more name (arabish, chinese...)
|
|
||||||
'lastname': r'^([^\W\d_]{2,30}[ ,.\'-]{0,3})+$',
|
|
||||||
'password': r'^|(.{3,})$',
|
|
||||||
'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$',
|
|
||||||
'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$',
|
|
||||||
'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$',
|
|
||||||
'mailbox-quota': r'^(\d+[bkMGT])|0$',
|
|
||||||
'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$'
|
|
||||||
}
|
|
||||||
|
|
||||||
def to_list(str_list):
|
def to_list(str_list):
|
||||||
return str_list.split(',') if str_list else []
|
return str_list.split(',') if str_list else []
|
||||||
|
@ -679,7 +670,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
|
||||||
# Validation
|
# Validation
|
||||||
try:
|
try:
|
||||||
format_errors = [key + ':' + str(user[key])
|
format_errors = [key + ':' + str(user[key])
|
||||||
for key, validator in validators.items()
|
for key, validator in VALIDATORS.items()
|
||||||
if user[key] is None or not re.match(validator, user[key])]
|
if user[key] is None or not re.match(validator, user[key])]
|
||||||
except KeyError, e:
|
except KeyError, e:
|
||||||
logger.error(m18n.n('user_import_missing_column',
|
logger.error(m18n.n('user_import_missing_column',
|
||||||
|
@ -744,10 +735,10 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
|
||||||
remove_forward = None
|
remove_forward = None
|
||||||
if info:
|
if info:
|
||||||
user['mail'] = None if info['mail'] == user['mail'] else user['mail']
|
user['mail'] = None if info['mail'] == user['mail'] else user['mail']
|
||||||
remove_alias = list(set(info['mail-aliases']) - set(user['mail-alias']))
|
remove_alias = list(set(info['mail-alias']) - set(user['mail-alias']))
|
||||||
remove_forward = list(set(info['mail-forward']) - set(user['mail-forward']))
|
remove_forward = list(set(info['mail-forward']) - set(user['mail-forward']))
|
||||||
user['alias'] = list(set(user['alias']) - set(info['mail-aliases']))
|
user['mail-alias'] = list(set(user['mail-alias']) - set(info['mail-alias']))
|
||||||
user['forward'] = list(set(user['forward']) - set(info['mail-forward']))
|
user['mail-forward'] = list(set(user['mail-forward']) - set(info['mail-forward']))
|
||||||
for group, infos in user_group_list()["groups"].items():
|
for group, infos in user_group_list()["groups"].items():
|
||||||
if group == "all_users":
|
if group == "all_users":
|
||||||
continue
|
continue
|
||||||
|
@ -768,7 +759,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
|
||||||
for group in user['groups']:
|
for group in user['groups']:
|
||||||
user_group_update(group, add=user['username'], sync_perm=False, imported=True)
|
user_group_update(group, add=user['username'], sync_perm=False, imported=True)
|
||||||
|
|
||||||
users = user_list()['users']
|
users = user_list(CSV_FIELDNAMES)['users']
|
||||||
operation_logger.start()
|
operation_logger.start()
|
||||||
# We do delete and update before to avoid mail uniqueness issues
|
# We do delete and update before to avoid mail uniqueness issues
|
||||||
for user in actions['deleted']:
|
for user in actions['deleted']:
|
||||||
|
|
Loading…
Add table
Reference in a new issue