Merge pull request #1311 from YunoHost/ci-format-dev

[CI] Format code
This commit is contained in:
Alexandre Aubin 2021-09-02 17:13:23 +02:00 committed by GitHub
commit 253eb6575c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 269 additions and 201 deletions

View file

@ -382,7 +382,7 @@ def is_unit_operation(
try: try:
context[field] = value.name context[field] = value.name
except: except:
context[field] = 'IOBase' context[field] = "IOBase"
operation_logger = OperationLogger(op_key, related_to, args=context) operation_logger = OperationLogger(op_key, related_to, args=context)
try: try:

View file

@ -117,53 +117,65 @@ def test_del_user(mocker):
def test_import_user(mocker): def test_import_user(mocker):
import csv import csv
from io import StringIO from io import StringIO
fieldnames = [u'username', u'firstname', u'lastname', u'password',
u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', fieldnames = [
u'groups'] "username",
"firstname",
"lastname",
"password",
"mailbox-quota",
"mail",
"mail-alias",
"mail-forward",
"groups",
]
with StringIO() as csv_io: with StringIO() as csv_io:
writer = csv.DictWriter(csv_io, fieldnames, delimiter=';', writer = csv.DictWriter(csv_io, fieldnames, delimiter=";", quotechar='"')
quotechar='"')
writer.writeheader() writer.writeheader()
writer.writerow({ writer.writerow(
'username': "albert", {
'firstname': "Albert", "username": "albert",
'lastname': "Good", "firstname": "Albert",
'password': "", "lastname": "Good",
'mailbox-quota': "1G", "password": "",
'mail': "albert@" + maindomain, "mailbox-quota": "1G",
'mail-alias': "albert2@" + maindomain, "mail": "albert@" + maindomain,
'mail-forward': "albert@example.com", "mail-alias": "albert2@" + maindomain,
'groups': "dev", "mail-forward": "albert@example.com",
}) "groups": "dev",
writer.writerow({ }
'username': "alice", )
'firstname': "Alice", writer.writerow(
'lastname': "White", {
'password': "", "username": "alice",
'mailbox-quota': "1G", "firstname": "Alice",
'mail': "alice@" + maindomain, "lastname": "White",
'mail-alias': "alice1@" + maindomain + ",alice2@" + maindomain, "password": "",
'mail-forward': "", "mailbox-quota": "1G",
'groups': "apps", "mail": "alice@" + maindomain,
}) "mail-alias": "alice1@" + maindomain + ",alice2@" + maindomain,
"mail-forward": "",
"groups": "apps",
}
)
csv_io.seek(0) csv_io.seek(0)
with message(mocker, "user_import_success"): with message(mocker, "user_import_success"):
user_import(csv_io, update=True, delete=True) user_import(csv_io, update=True, delete=True)
group_res = user_group_list()['groups'] group_res = user_group_list()["groups"]
user_res = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] user_res = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"]
assert "albert" in user_res assert "albert" in user_res
assert "alice" in user_res assert "alice" in user_res
assert "bob" not in user_res assert "bob" not in user_res
assert len(user_res['alice']['mail-alias']) == 2 assert len(user_res["alice"]["mail-alias"]) == 2
assert "albert" in group_res['dev']['members'] assert "albert" in group_res["dev"]["members"]
assert "alice" in group_res['apps']['members'] assert "alice" in group_res["apps"]["members"]
assert "alice" not in group_res['dev']['members'] assert "alice" not in group_res["dev"]["members"]
def test_export_user(mocker): def test_export_user(mocker):
result = user_export() result = user_export()
aliases = ','.join([alias + maindomain for alias in FIRST_ALIASES]) aliases = ",".join([alias + maindomain for alias in FIRST_ALIASES])
should_be = ( should_be = (
"username;firstname;lastname;password;mail;mail-alias;mail-forward;mailbox-quota;groups\r\n" "username;firstname;lastname;password;mail;mail-alias;mail-forward;mailbox-quota;groups\r\n"
f"alice;Alice;White;;alice@{maindomain};{aliases};;0;dev\r\n" f"alice;Alice;White;;alice@{maindomain};{aliases};;0;dev\r\n"

View file

@ -44,18 +44,18 @@ from yunohost.log import is_unit_operation
logger = getActionLogger("yunohost.user") logger = getActionLogger("yunohost.user")
FIELDS_FOR_IMPORT = { FIELDS_FOR_IMPORT = {
'username': r'^[a-z0-9_]+$', "username": r"^[a-z0-9_]+$",
'firstname': r'^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$', "firstname": r"^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$",
'lastname': r'^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$', "lastname": r"^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$",
'password': r'^|(.{3,})$', "password": r"^|(.{3,})$",
'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$', "mail": r"^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$",
'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', "mail-alias": r"^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$",
'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', "mail-forward": r"^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$",
'mailbox-quota': r'^(\d+[bkMGT])|0|$', "mailbox-quota": r"^(\d+[bkMGT])|0|$",
'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$' "groups": r"^|([a-z0-9_]+(,?[a-z0-9_]+)*)$",
} }
FIRST_ALIASES = ['root@', 'admin@', 'webmaster@', 'postmaster@', 'abuse@'] FIRST_ALIASES = ["root@", "admin@", "webmaster@", "postmaster@", "abuse@"]
def user_list(fields=None): def user_list(fields=None):
@ -63,47 +63,51 @@ def user_list(fields=None):
from yunohost.utils.ldap import _get_ldap_interface from yunohost.utils.ldap import _get_ldap_interface
ldap_attrs = { ldap_attrs = {
'username': 'uid', "username": "uid",
'password': '', # We can't request password in ldap "password": "", # We can't request password in ldap
'fullname': 'cn', "fullname": "cn",
'firstname': 'givenName', "firstname": "givenName",
'lastname': 'sn', "lastname": "sn",
'mail': 'mail', "mail": "mail",
'mail-alias': 'mail', "mail-alias": "mail",
'mail-forward': 'maildrop', "mail-forward": "maildrop",
'mailbox-quota': 'mailuserquota', "mailbox-quota": "mailuserquota",
'groups': 'memberOf', "groups": "memberOf",
'shell': 'loginShell', "shell": "loginShell",
'home-path': 'homeDirectory' "home-path": "homeDirectory",
} }
def display_default(values, _): def display_default(values, _):
return values[0] if len(values) == 1 else values return values[0] if len(values) == 1 else values
display = { display = {
'password': lambda values, user: '', "password": lambda values, user: "",
'mail': lambda values, user: display_default(values[:1], user), "mail": lambda values, user: display_default(values[:1], user),
'mail-alias': lambda values, _: values[1:], "mail-alias": lambda values, _: values[1:],
'mail-forward': lambda values, user: [forward for forward in values if forward != user['uid'][0]], "mail-forward": lambda values, user: [
'groups': lambda values, user: [ forward for forward in values if forward != user["uid"][0]
group[3:].split(',')[0] ],
"groups": lambda values, user: [
group[3:].split(",")[0]
for group in values for group in values
if not group.startswith('cn=all_users,') and if not group.startswith("cn=all_users,")
not group.startswith('cn=' + user['uid'][0] + ',')], and not group.startswith("cn=" + user["uid"][0] + ",")
'shell': lambda values, _: len(values) > 0 and values[0].strip() == "/bin/false" ],
"shell": lambda values, _: len(values) > 0
and values[0].strip() == "/bin/false",
} }
attrs = set(['uid']) attrs = set(["uid"])
users = {} users = {}
if not fields: if not fields:
fields = ['username', 'fullname', 'mail', 'mailbox-quota'] fields = ["username", "fullname", "mail", "mailbox-quota"]
for field in fields: for field in fields:
if field in ldap_attrs: if field in ldap_attrs:
attrs.add(ldap_attrs[field]) attrs.add(ldap_attrs[field])
else: else:
raise YunohostError('field_invalid', field) raise YunohostError("field_invalid", field)
ldap = _get_ldap_interface() ldap = _get_ldap_interface()
result = ldap.search( result = ldap.search(
@ -120,7 +124,7 @@ def user_list(fields=None):
values = user[ldap_attrs[field]] values = user[ldap_attrs[field]]
entry[field] = display.get(field, display_default)(values, user) entry[field] = display.get(field, display_default)(values, user)
users[user['uid'][0]] = entry users[user["uid"][0]] = entry
return {"users": users} return {"users": users}
@ -135,7 +139,7 @@ def user_create(
password, password,
mailbox_quota="0", mailbox_quota="0",
mail=None, mail=None,
from_import=False from_import=False,
): ):
from yunohost.domain import domain_list, _get_maindomain from yunohost.domain import domain_list, _get_maindomain
@ -253,10 +257,9 @@ def user_create(
# Attempt to create user home folder # Attempt to create user home folder
subprocess.check_call(["mkhomedir_helper", username]) subprocess.check_call(["mkhomedir_helper", username])
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
home = f'/home/{username}' home = f"/home/{username}"
if not os.path.isdir(home): if not os.path.isdir(home):
logger.warning(m18n.n('user_home_creation_failed', home=home), logger.warning(m18n.n("user_home_creation_failed", home=home), exc_info=1)
exc_info=1)
try: try:
subprocess.check_call( subprocess.check_call(
@ -282,12 +285,12 @@ def user_create(
# TODO: Send a welcome mail to user # TODO: Send a welcome mail to user
if not from_import: if not from_import:
logger.success(m18n.n('user_created')) logger.success(m18n.n("user_created"))
return {"fullname": fullname, "username": username, "mail": mail} return {"fullname": fullname, "username": username, "mail": mail}
@is_unit_operation([('username', 'user')]) @is_unit_operation([("username", "user")])
def user_delete(operation_logger, username, purge=False, from_import=False): def user_delete(operation_logger, username, purge=False, from_import=False):
""" """
Delete user Delete user
@ -331,13 +334,14 @@ def user_delete(operation_logger, username, purge=False, from_import=False):
subprocess.call(["nscd", "-i", "passwd"]) subprocess.call(["nscd", "-i", "passwd"])
if purge: if purge:
subprocess.call(['rm', '-rf', '/home/{0}'.format(username)]) subprocess.call(["rm", "-rf", "/home/{0}".format(username)])
subprocess.call(['rm', '-rf', '/var/mail/{0}'.format(username)]) subprocess.call(["rm", "-rf", "/var/mail/{0}".format(username)])
hook_callback('post_user_delete', args=[username, purge]) hook_callback("post_user_delete", args=[username, purge])
if not from_import: if not from_import:
logger.success(m18n.n('user_deleted')) logger.success(m18n.n("user_deleted"))
@is_unit_operation([("username", "user")], exclude=["change_password"]) @is_unit_operation([("username", "user")], exclude=["change_password"])
def user_update( def user_update(
@ -352,7 +356,7 @@ def user_update(
add_mailalias=None, add_mailalias=None,
remove_mailalias=None, remove_mailalias=None,
mailbox_quota=None, mailbox_quota=None,
from_import=False from_import=False,
): ):
""" """
Update user informations Update user informations
@ -412,7 +416,7 @@ def user_update(
] ]
# change_password is None if user_update is not called to change the password # change_password is None if user_update is not called to change the password
if change_password is not None and change_password != '': if change_password is not None and change_password != "":
# when in the cli interface if the option to change the password is called # when in the cli interface if the option to change the password is called
# without a specified value, change_password will be set to the const 0. # without a specified value, change_password will be set to the const 0.
# In this case we prompt for the new password. # In this case we prompt for the new password.
@ -429,20 +433,22 @@ def user_update(
aliases = [alias + main_domain for alias in FIRST_ALIASES] aliases = [alias + main_domain for alias in FIRST_ALIASES]
# If the requested mail address is already as main address or as an alias by this user # If the requested mail address is already as main address or as an alias by this user
if mail in user['mail']: if mail in user["mail"]:
user['mail'].remove(mail) user["mail"].remove(mail)
# Othewise, check that this mail address is not already used by this user # Othewise, check that this mail address is not already used by this user
else: else:
try: try:
ldap.validate_uniqueness({'mail': mail}) ldap.validate_uniqueness({"mail": mail})
except Exception as e: except Exception as e:
raise YunohostError('user_update_failed', user=username, error=e) raise YunohostError("user_update_failed", user=username, error=e)
if mail[mail.find('@') + 1:] not in domains: if mail[mail.find("@") + 1 :] not in domains:
raise YunohostError('mail_domain_unknown', domain=mail[mail.find('@') + 1:]) raise YunohostError(
"mail_domain_unknown", domain=mail[mail.find("@") + 1 :]
)
if mail in aliases: if mail in aliases:
raise YunohostValidationError("mail_unavailable") raise YunohostValidationError("mail_unavailable")
new_attr_dict['mail'] = [mail] + user['mail'][1:] new_attr_dict["mail"] = [mail] + user["mail"][1:]
if add_mailalias: if add_mailalias:
if not isinstance(add_mailalias, list): if not isinstance(add_mailalias, list):
@ -455,12 +461,10 @@ def user_update(
try: try:
ldap.validate_uniqueness({"mail": mail}) ldap.validate_uniqueness({"mail": mail})
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError("user_update_failed", user=username, error=e)
"user_update_failed", user=username, error=e if mail[mail.find("@") + 1 :] not in domains:
)
if mail[mail.find("@") + 1:] not in domains:
raise YunohostError( raise YunohostError(
"mail_domain_unknown", domain=mail[mail.find("@") + 1:] "mail_domain_unknown", domain=mail[mail.find("@") + 1 :]
) )
user["mail"].append(mail) user["mail"].append(mail)
new_attr_dict["mail"] = user["mail"] new_attr_dict["mail"] = user["mail"]
@ -517,7 +521,7 @@ def user_update(
if not from_import: if not from_import:
app_ssowatconf() app_ssowatconf()
logger.success(m18n.n('user_updated')) logger.success(m18n.n("user_updated"))
return user_info(username) return user_info(username)
@ -548,13 +552,13 @@ def user_info(username):
raise YunohostValidationError("user_unknown", user=username) raise YunohostValidationError("user_unknown", user=username)
result_dict = { result_dict = {
'username': user['uid'][0], "username": user["uid"][0],
'fullname': user['cn'][0], "fullname": user["cn"][0],
'firstname': user['givenName'][0], "firstname": user["givenName"][0],
'lastname': user['sn'][0], "lastname": user["sn"][0],
'mail': user['mail'][0], "mail": user["mail"][0],
'mail-aliases': [], "mail-aliases": [],
'mail-forward': [] "mail-forward": [],
} }
if len(user["mail"]) > 1: if len(user["mail"]) > 1:
@ -619,27 +623,32 @@ def user_export():
""" """
import csv # CSV are needed only in this function import csv # CSV are needed only in this function
from io import StringIO from io import StringIO
with StringIO() as csv_io: with StringIO() as csv_io:
writer = csv.DictWriter(csv_io, list(FIELDS_FOR_IMPORT.keys()), writer = csv.DictWriter(
delimiter=';', quotechar='"') csv_io, list(FIELDS_FOR_IMPORT.keys()), delimiter=";", quotechar='"'
)
writer.writeheader() writer.writeheader()
users = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] users = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"]
for username, user in users.items(): for username, user in users.items():
user['mail-alias'] = ','.join(user['mail-alias']) user["mail-alias"] = ",".join(user["mail-alias"])
user['mail-forward'] = ','.join(user['mail-forward']) user["mail-forward"] = ",".join(user["mail-forward"])
user['groups'] = ','.join(user['groups']) user["groups"] = ",".join(user["groups"])
writer.writerow(user) writer.writerow(user)
body = csv_io.getvalue().rstrip() body = csv_io.getvalue().rstrip()
if Moulinette.interface.type == 'api': if Moulinette.interface.type == "api":
# We return a raw bottle HTTPresponse (instead of serializable data like # We return a raw bottle HTTPresponse (instead of serializable data like
# list/dict, ...), which is gonna be picked and used directly by moulinette # list/dict, ...), which is gonna be picked and used directly by moulinette
from bottle import HTTPResponse from bottle import HTTPResponse
response = HTTPResponse(body=body,
headers={ response = HTTPResponse(
"Content-Disposition": "attachment; filename=users.csv", body=body,
"Content-Type": "text/csv", headers={
}) "Content-Disposition": "attachment; filename=users.csv",
"Content-Type": "text/csv",
},
)
return response return response
else: else:
return body return body
@ -662,106 +671,121 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
from yunohost.domain import domain_list from yunohost.domain import domain_list
# Pre-validate data and prepare what should be done # Pre-validate data and prepare what should be done
actions = { actions = {"created": [], "updated": [], "deleted": []}
'created': [],
'updated': [],
'deleted': []
}
is_well_formatted = True is_well_formatted = True
def to_list(str_list): def to_list(str_list):
L = str_list.split(',') if str_list else [] L = str_list.split(",") if str_list else []
L = [l.strip() for l in L] L = [l.strip() for l in L]
return L return L
existing_users = user_list()['users'] existing_users = user_list()["users"]
existing_groups = user_group_list()["groups"] existing_groups = user_group_list()["groups"]
existing_domains = domain_list()["domains"] existing_domains = domain_list()["domains"]
reader = csv.DictReader(csvfile, delimiter=';', quotechar='"') reader = csv.DictReader(csvfile, delimiter=";", quotechar='"')
users_in_csv = [] users_in_csv = []
missing_columns = [key for key in FIELDS_FOR_IMPORT.keys() if key not in reader.fieldnames] missing_columns = [
key for key in FIELDS_FOR_IMPORT.keys() if key not in reader.fieldnames
]
if missing_columns: if missing_columns:
raise YunohostValidationError("user_import_missing_columns", columns=', '.join(missing_columns)) raise YunohostValidationError(
"user_import_missing_columns", columns=", ".join(missing_columns)
)
for user in reader: for user in reader:
# Validate column values against regexes # Validate column values against regexes
format_errors = [f"{key}: '{user[key]}' doesn't match the expected format" format_errors = [
for key, validator in FIELDS_FOR_IMPORT.items() f"{key}: '{user[key]}' doesn't match the expected format"
if user[key] is None or not re.match(validator, user[key])] for key, validator in FIELDS_FOR_IMPORT.items()
if user[key] is None or not re.match(validator, user[key])
]
# Check for duplicated username lines # Check for duplicated username lines
if user['username'] in users_in_csv: if user["username"] in users_in_csv:
format_errors.append(f"username '{user['username']}' duplicated") format_errors.append(f"username '{user['username']}' duplicated")
users_in_csv.append(user['username']) users_in_csv.append(user["username"])
# Validate that groups exist # Validate that groups exist
user['groups'] = to_list(user['groups']) user["groups"] = to_list(user["groups"])
unknown_groups = [g for g in user['groups'] if g not in existing_groups] unknown_groups = [g for g in user["groups"] if g not in existing_groups]
if unknown_groups: if unknown_groups:
format_errors.append(f"username '{user['username']}': unknown groups %s" % ', '.join(unknown_groups)) format_errors.append(
f"username '{user['username']}': unknown groups %s"
% ", ".join(unknown_groups)
)
# Validate that domains exist # Validate that domains exist
user['mail-alias'] = to_list(user['mail-alias']) user["mail-alias"] = to_list(user["mail-alias"])
user['mail-forward'] = to_list(user['mail-forward']) user["mail-forward"] = to_list(user["mail-forward"])
user['domain'] = user['mail'].split('@')[1] user["domain"] = user["mail"].split("@")[1]
unknown_domains = [] unknown_domains = []
if user['domain'] not in existing_domains: if user["domain"] not in existing_domains:
unknown_domains.append(user['domain']) unknown_domains.append(user["domain"])
unknown_domains += [mail.split('@', 1)[1] for mail in user['mail-alias'] if mail.split('@', 1)[1] not in existing_domains] unknown_domains += [
mail.split("@", 1)[1]
for mail in user["mail-alias"]
if mail.split("@", 1)[1] not in existing_domains
]
unknown_domains = set(unknown_domains) unknown_domains = set(unknown_domains)
if unknown_domains: if unknown_domains:
format_errors.append(f"username '{user['username']}': unknown domains %s" % ', '.join(unknown_domains)) format_errors.append(
f"username '{user['username']}': unknown domains %s"
% ", ".join(unknown_domains)
)
if format_errors: if format_errors:
logger.error(m18n.n('user_import_bad_line', logger.error(
line=reader.line_num, m18n.n(
details=', '.join(format_errors))) "user_import_bad_line",
line=reader.line_num,
details=", ".join(format_errors),
)
)
is_well_formatted = False is_well_formatted = False
continue continue
# Choose what to do with this line and prepare data # Choose what to do with this line and prepare data
user['mailbox-quota'] = user['mailbox-quota'] or "0" user["mailbox-quota"] = user["mailbox-quota"] or "0"
# User creation # User creation
if user['username'] not in existing_users: if user["username"] not in existing_users:
# Generate password if not exists # Generate password if not exists
# This could be used when reset password will be merged # This could be used when reset password will be merged
if not user['password']: if not user["password"]:
user['password'] = random_ascii(70) user["password"] = random_ascii(70)
actions['created'].append(user) actions["created"].append(user)
# User update # User update
elif update: elif update:
actions['updated'].append(user) actions["updated"].append(user)
if delete: if delete:
actions['deleted'] = [user for user in existing_users if user not in users_in_csv] actions["deleted"] = [
user for user in existing_users if user not in users_in_csv
]
if delete and not users_in_csv: if delete and not users_in_csv:
logger.error("You used the delete option with an empty csv file ... You probably did not really mean to do that, did you !?") logger.error(
"You used the delete option with an empty csv file ... You probably did not really mean to do that, did you !?"
)
is_well_formatted = False is_well_formatted = False
if not is_well_formatted: if not is_well_formatted:
raise YunohostValidationError('user_import_bad_file') raise YunohostValidationError("user_import_bad_file")
total = len(actions['created'] + actions['updated'] + actions['deleted']) total = len(actions["created"] + actions["updated"] + actions["deleted"])
if total == 0: if total == 0:
logger.info(m18n.n('user_import_nothing_to_do')) logger.info(m18n.n("user_import_nothing_to_do"))
return return
# Apply creation, update and deletion operation # Apply creation, update and deletion operation
result = { result = {"created": 0, "updated": 0, "deleted": 0, "errors": 0}
'created': 0,
'updated': 0,
'deleted': 0,
'errors': 0
}
def progress(info=""): def progress(info=""):
progress.nb += 1 progress.nb += 1
@ -774,12 +798,13 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
return return
progress.old = bar progress.old = bar
logger.info(bar) logger.info(bar)
progress.nb = 0 progress.nb = 0
progress.old = "" progress.old = ""
def on_failure(user, exception): def on_failure(user, exception):
result['errors'] += 1 result["errors"] += 1
logger.error(user + ': ' + str(exception)) logger.error(user + ": " + str(exception))
def update(new_infos, old_infos=False): def update(new_infos, old_infos=False):
remove_alias = None remove_alias = None
@ -787,11 +812,21 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
remove_groups = [] remove_groups = []
add_groups = new_infos["groups"] add_groups = new_infos["groups"]
if old_infos: if old_infos:
new_infos['mail'] = None if old_infos['mail'] == new_infos['mail'] else new_infos['mail'] new_infos["mail"] = (
remove_alias = list(set(old_infos['mail-alias']) - set(new_infos['mail-alias'])) None if old_infos["mail"] == new_infos["mail"] else new_infos["mail"]
remove_forward = list(set(old_infos['mail-forward']) - set(new_infos['mail-forward'])) )
new_infos['mail-alias'] = list(set(new_infos['mail-alias']) - set(old_infos['mail-alias'])) remove_alias = list(
new_infos['mail-forward'] = list(set(new_infos['mail-forward']) - set(old_infos['mail-forward'])) set(old_infos["mail-alias"]) - set(new_infos["mail-alias"])
)
remove_forward = list(
set(old_infos["mail-forward"]) - set(new_infos["mail-forward"])
)
new_infos["mail-alias"] = list(
set(new_infos["mail-alias"]) - set(old_infos["mail-alias"])
)
new_infos["mail-forward"] = list(
set(new_infos["mail-forward"]) - set(old_infos["mail-forward"])
)
remove_groups = list(set(old_infos["groups"]) - set(new_infos["groups"])) remove_groups = list(set(old_infos["groups"]) - set(new_infos["groups"]))
add_groups = list(set(new_infos["groups"]) - set(old_infos["groups"])) add_groups = list(set(new_infos["groups"]) - set(old_infos["groups"]))
@ -799,69 +834,90 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
for group, infos in existing_groups.items(): for group, infos in existing_groups.items():
# Loop only on groups in 'remove_groups' # Loop only on groups in 'remove_groups'
# Ignore 'all_users' and primary group # Ignore 'all_users' and primary group
if group in ["all_users", new_infos['username']] or group not in remove_groups: if (
group in ["all_users", new_infos["username"]]
or group not in remove_groups
):
continue continue
# If the user is in this group (and it's not the primary group), # If the user is in this group (and it's not the primary group),
# remove the member from the group # remove the member from the group
if new_infos['username'] in infos["members"]: if new_infos["username"] in infos["members"]:
user_group_update(group, remove=new_infos['username'], sync_perm=False, from_import=True) user_group_update(
group,
remove=new_infos["username"],
sync_perm=False,
from_import=True,
)
user_update(new_infos['username'], user_update(
new_infos['firstname'], new_infos['lastname'], new_infos["username"],
new_infos['mail'], new_infos['password'], new_infos["firstname"],
mailbox_quota=new_infos['mailbox-quota'], new_infos["lastname"],
mail=new_infos['mail'], add_mailalias=new_infos['mail-alias'], new_infos["mail"],
remove_mailalias=remove_alias, new_infos["password"],
remove_mailforward=remove_forward, mailbox_quota=new_infos["mailbox-quota"],
add_mailforward=new_infos['mail-forward'], from_import=True) mail=new_infos["mail"],
add_mailalias=new_infos["mail-alias"],
remove_mailalias=remove_alias,
remove_mailforward=remove_forward,
add_mailforward=new_infos["mail-forward"],
from_import=True,
)
for group in add_groups: for group in add_groups:
if group in ["all_users", new_infos['username']]: if group in ["all_users", new_infos["username"]]:
continue continue
user_group_update(group, add=new_infos['username'], sync_perm=False, from_import=True) user_group_update(
group, add=new_infos["username"], sync_perm=False, from_import=True
)
users = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] users = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"]
operation_logger.start() operation_logger.start()
# We do delete and update before to avoid mail uniqueness issues # We do delete and update before to avoid mail uniqueness issues
for user in actions['deleted']: for user in actions["deleted"]:
try: try:
user_delete(user, purge=True, from_import=True) user_delete(user, purge=True, from_import=True)
result['deleted'] += 1 result["deleted"] += 1
except YunohostError as e: except YunohostError as e:
on_failure(user, e) on_failure(user, e)
progress(f"Deleting {user}") progress(f"Deleting {user}")
for user in actions['updated']: for user in actions["updated"]:
try: try:
update(user, users[user['username']]) update(user, users[user["username"]])
result['updated'] += 1 result["updated"] += 1
except YunohostError as e: except YunohostError as e:
on_failure(user['username'], e) on_failure(user["username"], e)
progress(f"Updating {user['username']}") progress(f"Updating {user['username']}")
for user in actions['created']: for user in actions["created"]:
try: try:
user_create(user['username'], user_create(
user['firstname'], user['lastname'], user["username"],
user['domain'], user['password'], user["firstname"],
user['mailbox-quota'], from_import=True) user["lastname"],
user["domain"],
user["password"],
user["mailbox-quota"],
from_import=True,
)
update(user) update(user)
result['created'] += 1 result["created"] += 1
except YunohostError as e: except YunohostError as e:
on_failure(user['username'], e) on_failure(user["username"], e)
progress(f"Creating {user['username']}") progress(f"Creating {user['username']}")
permission_sync_to_user() permission_sync_to_user()
app_ssowatconf() app_ssowatconf()
if result['errors']: if result["errors"]:
msg = m18n.n('user_import_partial_failed') msg = m18n.n("user_import_partial_failed")
if result['created'] + result['updated'] + result['deleted'] == 0: if result["created"] + result["updated"] + result["deleted"] == 0:
msg = m18n.n('user_import_failed') msg = m18n.n("user_import_failed")
logger.error(msg) logger.error(msg)
operation_logger.error(msg) operation_logger.error(msg)
else: else:
logger.success(m18n.n('user_import_success')) logger.success(m18n.n("user_import_success"))
operation_logger.success() operation_logger.success()
return result return result
@ -1038,7 +1094,7 @@ def user_group_delete(operation_logger, groupname, force=False, sync_perm=True):
logger.debug(m18n.n("group_deleted", group=groupname)) logger.debug(m18n.n("group_deleted", group=groupname))
@is_unit_operation([('groupname', 'group')]) @is_unit_operation([("groupname", "group")])
def user_group_update( def user_group_update(
operation_logger, operation_logger,
groupname, groupname,
@ -1046,7 +1102,7 @@ def user_group_update(
remove=None, remove=None,
force=False, force=False,
sync_perm=True, sync_perm=True,
from_import=False from_import=False,
): ):
""" """
Update user informations Update user informations