From c197e171bbc3d18a095d0b9bb3a0f72d57afab3e Mon Sep 17 00:00:00 2001 From: yunohost-bot Date: Thu, 2 Sep 2021 15:04:02 +0000 Subject: [PATCH] [CI] Format code --- src/yunohost/log.py | 4 +- src/yunohost/tests/test_user-group.py | 80 +++--- src/yunohost/user.py | 386 +++++++++++++++----------- 3 files changed, 269 insertions(+), 201 deletions(-) diff --git a/src/yunohost/log.py b/src/yunohost/log.py index 6460d8d4a..3f6382af2 100644 --- a/src/yunohost/log.py +++ b/src/yunohost/log.py @@ -373,7 +373,7 @@ def is_unit_operation( context.pop(field, None) # Context is made from args given to main function by argparse - # This context will be added in extra parameters in yml file, so this context should + # This context will be added in extra parameters in yml file, so this context should # be serializable and short enough (it will be displayed in webadmin) # Argparse can provide some File or Stream, so here we display the filename or # the IOBase, if we have no name. @@ -382,7 +382,7 @@ def is_unit_operation( try: context[field] = value.name except: - context[field] = 'IOBase' + context[field] = "IOBase" operation_logger = OperationLogger(op_key, related_to, args=context) try: diff --git a/src/yunohost/tests/test_user-group.py b/src/yunohost/tests/test_user-group.py index ab7e72555..60e748108 100644 --- a/src/yunohost/tests/test_user-group.py +++ b/src/yunohost/tests/test_user-group.py @@ -117,53 +117,65 @@ def test_del_user(mocker): def test_import_user(mocker): import csv from io import StringIO - fieldnames = [u'username', u'firstname', u'lastname', u'password', - u'mailbox-quota', u'mail', u'mail-alias', u'mail-forward', - u'groups'] + + fieldnames = [ + "username", + "firstname", + "lastname", + "password", + "mailbox-quota", + "mail", + "mail-alias", + "mail-forward", + "groups", + ] with StringIO() as csv_io: - writer = csv.DictWriter(csv_io, fieldnames, delimiter=';', - quotechar='"') + writer = csv.DictWriter(csv_io, fieldnames, delimiter=";", quotechar='"') writer.writeheader() - writer.writerow({ - 'username': "albert", - 'firstname': "Albert", - 'lastname': "Good", - 'password': "", - 'mailbox-quota': "1G", - 'mail': "albert@" + maindomain, - 'mail-alias': "albert2@" + maindomain, - 'mail-forward': "albert@example.com", - 'groups': "dev", - }) - writer.writerow({ - 'username': "alice", - 'firstname': "Alice", - 'lastname': "White", - 'password': "", - 'mailbox-quota': "1G", - 'mail': "alice@" + maindomain, - 'mail-alias': "alice1@" + maindomain + ",alice2@" + maindomain, - 'mail-forward': "", - 'groups': "apps", - }) + writer.writerow( + { + "username": "albert", + "firstname": "Albert", + "lastname": "Good", + "password": "", + "mailbox-quota": "1G", + "mail": "albert@" + maindomain, + "mail-alias": "albert2@" + maindomain, + "mail-forward": "albert@example.com", + "groups": "dev", + } + ) + writer.writerow( + { + "username": "alice", + "firstname": "Alice", + "lastname": "White", + "password": "", + "mailbox-quota": "1G", + "mail": "alice@" + maindomain, + "mail-alias": "alice1@" + maindomain + ",alice2@" + maindomain, + "mail-forward": "", + "groups": "apps", + } + ) csv_io.seek(0) with message(mocker, "user_import_success"): user_import(csv_io, update=True, delete=True) - group_res = user_group_list()['groups'] - user_res = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] + group_res = user_group_list()["groups"] + user_res = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"] assert "albert" in user_res assert "alice" in user_res assert "bob" not in user_res - assert len(user_res['alice']['mail-alias']) == 2 - assert "albert" in group_res['dev']['members'] - assert "alice" in group_res['apps']['members'] - assert "alice" not in group_res['dev']['members'] + assert len(user_res["alice"]["mail-alias"]) == 2 + assert "albert" in group_res["dev"]["members"] + assert "alice" in group_res["apps"]["members"] + assert "alice" not in group_res["dev"]["members"] def test_export_user(mocker): result = user_export() - aliases = ','.join([alias + maindomain for alias in FIRST_ALIASES]) + aliases = ",".join([alias + maindomain for alias in FIRST_ALIASES]) should_be = ( "username;firstname;lastname;password;mail;mail-alias;mail-forward;mailbox-quota;groups\r\n" f"alice;Alice;White;;alice@{maindomain};{aliases};;0;dev\r\n" diff --git a/src/yunohost/user.py b/src/yunohost/user.py index 07d1773b3..c89f9a05f 100644 --- a/src/yunohost/user.py +++ b/src/yunohost/user.py @@ -44,18 +44,18 @@ from yunohost.log import is_unit_operation logger = getActionLogger("yunohost.user") FIELDS_FOR_IMPORT = { - 'username': r'^[a-z0-9_]+$', - 'firstname': r'^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$', - 'lastname': r'^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$', - 'password': r'^|(.{3,})$', - 'mail': r'^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$', - 'mail-alias': r'^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', - 'mail-forward': r'^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$', - 'mailbox-quota': r'^(\d+[bkMGT])|0|$', - 'groups': r'^|([a-z0-9_]+(,?[a-z0-9_]+)*)$' + "username": r"^[a-z0-9_]+$", + "firstname": r"^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$", + "lastname": r"^([^\W\d_]{1,30}[ ,.\'-]{0,3})+$", + "password": r"^|(.{3,})$", + "mail": r"^([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}))$", + "mail-alias": r"^|([\w.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$", + "mail-forward": r"^|([\w\+.-]+@([^\W_A-Z]+([-]*[^\W_A-Z]+)*\.)+((xn--)?[^\W_]{2,}),?)+$", + "mailbox-quota": r"^(\d+[bkMGT])|0|$", + "groups": r"^|([a-z0-9_]+(,?[a-z0-9_]+)*)$", } -FIRST_ALIASES = ['root@', 'admin@', 'webmaster@', 'postmaster@', 'abuse@'] +FIRST_ALIASES = ["root@", "admin@", "webmaster@", "postmaster@", "abuse@"] def user_list(fields=None): @@ -63,47 +63,51 @@ def user_list(fields=None): from yunohost.utils.ldap import _get_ldap_interface ldap_attrs = { - 'username': 'uid', - 'password': '', # We can't request password in ldap - 'fullname': 'cn', - 'firstname': 'givenName', - 'lastname': 'sn', - 'mail': 'mail', - 'mail-alias': 'mail', - 'mail-forward': 'maildrop', - 'mailbox-quota': 'mailuserquota', - 'groups': 'memberOf', - 'shell': 'loginShell', - 'home-path': 'homeDirectory' + "username": "uid", + "password": "", # We can't request password in ldap + "fullname": "cn", + "firstname": "givenName", + "lastname": "sn", + "mail": "mail", + "mail-alias": "mail", + "mail-forward": "maildrop", + "mailbox-quota": "mailuserquota", + "groups": "memberOf", + "shell": "loginShell", + "home-path": "homeDirectory", } def display_default(values, _): return values[0] if len(values) == 1 else values display = { - 'password': lambda values, user: '', - 'mail': lambda values, user: display_default(values[:1], user), - 'mail-alias': lambda values, _: values[1:], - 'mail-forward': lambda values, user: [forward for forward in values if forward != user['uid'][0]], - 'groups': lambda values, user: [ - group[3:].split(',')[0] + "password": lambda values, user: "", + "mail": lambda values, user: display_default(values[:1], user), + "mail-alias": lambda values, _: values[1:], + "mail-forward": lambda values, user: [ + forward for forward in values if forward != user["uid"][0] + ], + "groups": lambda values, user: [ + group[3:].split(",")[0] for group in values - if not group.startswith('cn=all_users,') and - not group.startswith('cn=' + user['uid'][0] + ',')], - 'shell': lambda values, _: len(values) > 0 and values[0].strip() == "/bin/false" + if not group.startswith("cn=all_users,") + and not group.startswith("cn=" + user["uid"][0] + ",") + ], + "shell": lambda values, _: len(values) > 0 + and values[0].strip() == "/bin/false", } - attrs = set(['uid']) + attrs = set(["uid"]) users = {} if not fields: - fields = ['username', 'fullname', 'mail', 'mailbox-quota'] + fields = ["username", "fullname", "mail", "mailbox-quota"] for field in fields: if field in ldap_attrs: attrs.add(ldap_attrs[field]) else: - raise YunohostError('field_invalid', field) + raise YunohostError("field_invalid", field) ldap = _get_ldap_interface() result = ldap.search( @@ -120,7 +124,7 @@ def user_list(fields=None): values = user[ldap_attrs[field]] entry[field] = display.get(field, display_default)(values, user) - users[user['uid'][0]] = entry + users[user["uid"][0]] = entry return {"users": users} @@ -135,7 +139,7 @@ def user_create( password, mailbox_quota="0", mail=None, - from_import=False + from_import=False, ): from yunohost.domain import domain_list, _get_maindomain @@ -253,10 +257,9 @@ def user_create( # Attempt to create user home folder subprocess.check_call(["mkhomedir_helper", username]) except subprocess.CalledProcessError: - home = f'/home/{username}' + home = f"/home/{username}" if not os.path.isdir(home): - logger.warning(m18n.n('user_home_creation_failed', home=home), - exc_info=1) + logger.warning(m18n.n("user_home_creation_failed", home=home), exc_info=1) try: subprocess.check_call( @@ -282,12 +285,12 @@ def user_create( # TODO: Send a welcome mail to user if not from_import: - logger.success(m18n.n('user_created')) + logger.success(m18n.n("user_created")) return {"fullname": fullname, "username": username, "mail": mail} -@is_unit_operation([('username', 'user')]) +@is_unit_operation([("username", "user")]) def user_delete(operation_logger, username, purge=False, from_import=False): """ Delete user @@ -331,13 +334,14 @@ def user_delete(operation_logger, username, purge=False, from_import=False): subprocess.call(["nscd", "-i", "passwd"]) if purge: - subprocess.call(['rm', '-rf', '/home/{0}'.format(username)]) - subprocess.call(['rm', '-rf', '/var/mail/{0}'.format(username)]) + subprocess.call(["rm", "-rf", "/home/{0}".format(username)]) + subprocess.call(["rm", "-rf", "/var/mail/{0}".format(username)]) - hook_callback('post_user_delete', args=[username, purge]) + hook_callback("post_user_delete", args=[username, purge]) if not from_import: - logger.success(m18n.n('user_deleted')) + logger.success(m18n.n("user_deleted")) + @is_unit_operation([("username", "user")], exclude=["change_password"]) def user_update( @@ -352,7 +356,7 @@ def user_update( add_mailalias=None, remove_mailalias=None, mailbox_quota=None, - from_import=False + from_import=False, ): """ Update user informations @@ -412,7 +416,7 @@ def user_update( ] # change_password is None if user_update is not called to change the password - if change_password is not None and change_password != '': + if change_password is not None and change_password != "": # when in the cli interface if the option to change the password is called # without a specified value, change_password will be set to the const 0. # In this case we prompt for the new password. @@ -429,20 +433,22 @@ def user_update( aliases = [alias + main_domain for alias in FIRST_ALIASES] # If the requested mail address is already as main address or as an alias by this user - if mail in user['mail']: - user['mail'].remove(mail) + if mail in user["mail"]: + user["mail"].remove(mail) # Othewise, check that this mail address is not already used by this user else: try: - ldap.validate_uniqueness({'mail': mail}) + ldap.validate_uniqueness({"mail": mail}) except Exception as e: - raise YunohostError('user_update_failed', user=username, error=e) - if mail[mail.find('@') + 1:] not in domains: - raise YunohostError('mail_domain_unknown', domain=mail[mail.find('@') + 1:]) + raise YunohostError("user_update_failed", user=username, error=e) + if mail[mail.find("@") + 1 :] not in domains: + raise YunohostError( + "mail_domain_unknown", domain=mail[mail.find("@") + 1 :] + ) if mail in aliases: raise YunohostValidationError("mail_unavailable") - new_attr_dict['mail'] = [mail] + user['mail'][1:] + new_attr_dict["mail"] = [mail] + user["mail"][1:] if add_mailalias: if not isinstance(add_mailalias, list): @@ -455,12 +461,10 @@ def user_update( try: ldap.validate_uniqueness({"mail": mail}) except Exception as e: - raise YunohostError( - "user_update_failed", user=username, error=e - ) - if mail[mail.find("@") + 1:] not in domains: + raise YunohostError("user_update_failed", user=username, error=e) + if mail[mail.find("@") + 1 :] not in domains: raise YunohostError( - "mail_domain_unknown", domain=mail[mail.find("@") + 1:] + "mail_domain_unknown", domain=mail[mail.find("@") + 1 :] ) user["mail"].append(mail) new_attr_dict["mail"] = user["mail"] @@ -517,7 +521,7 @@ def user_update( if not from_import: app_ssowatconf() - logger.success(m18n.n('user_updated')) + logger.success(m18n.n("user_updated")) return user_info(username) @@ -548,13 +552,13 @@ def user_info(username): raise YunohostValidationError("user_unknown", user=username) result_dict = { - 'username': user['uid'][0], - 'fullname': user['cn'][0], - 'firstname': user['givenName'][0], - 'lastname': user['sn'][0], - 'mail': user['mail'][0], - 'mail-aliases': [], - 'mail-forward': [] + "username": user["uid"][0], + "fullname": user["cn"][0], + "firstname": user["givenName"][0], + "lastname": user["sn"][0], + "mail": user["mail"][0], + "mail-aliases": [], + "mail-forward": [], } if len(user["mail"]) > 1: @@ -619,27 +623,32 @@ def user_export(): """ import csv # CSV are needed only in this function from io import StringIO + with StringIO() as csv_io: - writer = csv.DictWriter(csv_io, list(FIELDS_FOR_IMPORT.keys()), - delimiter=';', quotechar='"') + writer = csv.DictWriter( + csv_io, list(FIELDS_FOR_IMPORT.keys()), delimiter=";", quotechar='"' + ) writer.writeheader() - users = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] + users = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"] for username, user in users.items(): - user['mail-alias'] = ','.join(user['mail-alias']) - user['mail-forward'] = ','.join(user['mail-forward']) - user['groups'] = ','.join(user['groups']) + user["mail-alias"] = ",".join(user["mail-alias"]) + user["mail-forward"] = ",".join(user["mail-forward"]) + user["groups"] = ",".join(user["groups"]) writer.writerow(user) body = csv_io.getvalue().rstrip() - if Moulinette.interface.type == 'api': + if Moulinette.interface.type == "api": # We return a raw bottle HTTPresponse (instead of serializable data like # list/dict, ...), which is gonna be picked and used directly by moulinette from bottle import HTTPResponse - response = HTTPResponse(body=body, - headers={ - "Content-Disposition": "attachment; filename=users.csv", - "Content-Type": "text/csv", - }) + + response = HTTPResponse( + body=body, + headers={ + "Content-Disposition": "attachment; filename=users.csv", + "Content-Type": "text/csv", + }, + ) return response else: return body @@ -662,106 +671,121 @@ def user_import(operation_logger, csvfile, update=False, delete=False): from yunohost.domain import domain_list # Pre-validate data and prepare what should be done - actions = { - 'created': [], - 'updated': [], - 'deleted': [] - } + actions = {"created": [], "updated": [], "deleted": []} is_well_formatted = True def to_list(str_list): - L = str_list.split(',') if str_list else [] + L = str_list.split(",") if str_list else [] L = [l.strip() for l in L] return L - existing_users = user_list()['users'] + existing_users = user_list()["users"] existing_groups = user_group_list()["groups"] existing_domains = domain_list()["domains"] - reader = csv.DictReader(csvfile, delimiter=';', quotechar='"') + reader = csv.DictReader(csvfile, delimiter=";", quotechar='"') users_in_csv = [] - missing_columns = [key for key in FIELDS_FOR_IMPORT.keys() if key not in reader.fieldnames] + missing_columns = [ + key for key in FIELDS_FOR_IMPORT.keys() if key not in reader.fieldnames + ] if missing_columns: - raise YunohostValidationError("user_import_missing_columns", columns=', '.join(missing_columns)) + raise YunohostValidationError( + "user_import_missing_columns", columns=", ".join(missing_columns) + ) for user in reader: # Validate column values against regexes - format_errors = [f"{key}: '{user[key]}' doesn't match the expected format" - for key, validator in FIELDS_FOR_IMPORT.items() - if user[key] is None or not re.match(validator, user[key])] + format_errors = [ + f"{key}: '{user[key]}' doesn't match the expected format" + for key, validator in FIELDS_FOR_IMPORT.items() + if user[key] is None or not re.match(validator, user[key]) + ] # Check for duplicated username lines - if user['username'] in users_in_csv: + if user["username"] in users_in_csv: format_errors.append(f"username '{user['username']}' duplicated") - users_in_csv.append(user['username']) + users_in_csv.append(user["username"]) # Validate that groups exist - user['groups'] = to_list(user['groups']) - unknown_groups = [g for g in user['groups'] if g not in existing_groups] + user["groups"] = to_list(user["groups"]) + unknown_groups = [g for g in user["groups"] if g not in existing_groups] if unknown_groups: - format_errors.append(f"username '{user['username']}': unknown groups %s" % ', '.join(unknown_groups)) + format_errors.append( + f"username '{user['username']}': unknown groups %s" + % ", ".join(unknown_groups) + ) # Validate that domains exist - user['mail-alias'] = to_list(user['mail-alias']) - user['mail-forward'] = to_list(user['mail-forward']) - user['domain'] = user['mail'].split('@')[1] + user["mail-alias"] = to_list(user["mail-alias"]) + user["mail-forward"] = to_list(user["mail-forward"]) + user["domain"] = user["mail"].split("@")[1] unknown_domains = [] - if user['domain'] not in existing_domains: - unknown_domains.append(user['domain']) + if user["domain"] not in existing_domains: + unknown_domains.append(user["domain"]) - unknown_domains += [mail.split('@', 1)[1] for mail in user['mail-alias'] if mail.split('@', 1)[1] not in existing_domains] + unknown_domains += [ + mail.split("@", 1)[1] + for mail in user["mail-alias"] + if mail.split("@", 1)[1] not in existing_domains + ] unknown_domains = set(unknown_domains) if unknown_domains: - format_errors.append(f"username '{user['username']}': unknown domains %s" % ', '.join(unknown_domains)) + format_errors.append( + f"username '{user['username']}': unknown domains %s" + % ", ".join(unknown_domains) + ) if format_errors: - logger.error(m18n.n('user_import_bad_line', - line=reader.line_num, - details=', '.join(format_errors))) + logger.error( + m18n.n( + "user_import_bad_line", + line=reader.line_num, + details=", ".join(format_errors), + ) + ) is_well_formatted = False continue # Choose what to do with this line and prepare data - user['mailbox-quota'] = user['mailbox-quota'] or "0" + user["mailbox-quota"] = user["mailbox-quota"] or "0" # User creation - if user['username'] not in existing_users: + if user["username"] not in existing_users: # Generate password if not exists # This could be used when reset password will be merged - if not user['password']: - user['password'] = random_ascii(70) - actions['created'].append(user) + if not user["password"]: + user["password"] = random_ascii(70) + actions["created"].append(user) # User update elif update: - actions['updated'].append(user) + actions["updated"].append(user) if delete: - actions['deleted'] = [user for user in existing_users if user not in users_in_csv] + actions["deleted"] = [ + user for user in existing_users if user not in users_in_csv + ] if delete and not users_in_csv: - logger.error("You used the delete option with an empty csv file ... You probably did not really mean to do that, did you !?") + logger.error( + "You used the delete option with an empty csv file ... You probably did not really mean to do that, did you !?" + ) is_well_formatted = False if not is_well_formatted: - raise YunohostValidationError('user_import_bad_file') + raise YunohostValidationError("user_import_bad_file") - total = len(actions['created'] + actions['updated'] + actions['deleted']) + total = len(actions["created"] + actions["updated"] + actions["deleted"]) if total == 0: - logger.info(m18n.n('user_import_nothing_to_do')) + logger.info(m18n.n("user_import_nothing_to_do")) return # Apply creation, update and deletion operation - result = { - 'created': 0, - 'updated': 0, - 'deleted': 0, - 'errors': 0 - } + result = {"created": 0, "updated": 0, "deleted": 0, "errors": 0} def progress(info=""): progress.nb += 1 @@ -774,12 +798,13 @@ def user_import(operation_logger, csvfile, update=False, delete=False): return progress.old = bar logger.info(bar) + progress.nb = 0 progress.old = "" def on_failure(user, exception): - result['errors'] += 1 - logger.error(user + ': ' + str(exception)) + result["errors"] += 1 + logger.error(user + ": " + str(exception)) def update(new_infos, old_infos=False): remove_alias = None @@ -787,11 +812,21 @@ def user_import(operation_logger, csvfile, update=False, delete=False): remove_groups = [] add_groups = new_infos["groups"] if old_infos: - new_infos['mail'] = None if old_infos['mail'] == new_infos['mail'] else new_infos['mail'] - remove_alias = list(set(old_infos['mail-alias']) - set(new_infos['mail-alias'])) - remove_forward = list(set(old_infos['mail-forward']) - set(new_infos['mail-forward'])) - new_infos['mail-alias'] = list(set(new_infos['mail-alias']) - set(old_infos['mail-alias'])) - new_infos['mail-forward'] = list(set(new_infos['mail-forward']) - set(old_infos['mail-forward'])) + new_infos["mail"] = ( + None if old_infos["mail"] == new_infos["mail"] else new_infos["mail"] + ) + remove_alias = list( + set(old_infos["mail-alias"]) - set(new_infos["mail-alias"]) + ) + remove_forward = list( + set(old_infos["mail-forward"]) - set(new_infos["mail-forward"]) + ) + new_infos["mail-alias"] = list( + set(new_infos["mail-alias"]) - set(old_infos["mail-alias"]) + ) + new_infos["mail-forward"] = list( + set(new_infos["mail-forward"]) - set(old_infos["mail-forward"]) + ) remove_groups = list(set(old_infos["groups"]) - set(new_infos["groups"])) add_groups = list(set(new_infos["groups"]) - set(old_infos["groups"])) @@ -799,69 +834,90 @@ def user_import(operation_logger, csvfile, update=False, delete=False): for group, infos in existing_groups.items(): # Loop only on groups in 'remove_groups' # Ignore 'all_users' and primary group - if group in ["all_users", new_infos['username']] or group not in remove_groups: + if ( + group in ["all_users", new_infos["username"]] + or group not in remove_groups + ): continue # If the user is in this group (and it's not the primary group), # remove the member from the group - if new_infos['username'] in infos["members"]: - user_group_update(group, remove=new_infos['username'], sync_perm=False, from_import=True) + if new_infos["username"] in infos["members"]: + user_group_update( + group, + remove=new_infos["username"], + sync_perm=False, + from_import=True, + ) - user_update(new_infos['username'], - new_infos['firstname'], new_infos['lastname'], - new_infos['mail'], new_infos['password'], - mailbox_quota=new_infos['mailbox-quota'], - mail=new_infos['mail'], add_mailalias=new_infos['mail-alias'], - remove_mailalias=remove_alias, - remove_mailforward=remove_forward, - add_mailforward=new_infos['mail-forward'], from_import=True) + user_update( + new_infos["username"], + new_infos["firstname"], + new_infos["lastname"], + new_infos["mail"], + new_infos["password"], + mailbox_quota=new_infos["mailbox-quota"], + mail=new_infos["mail"], + add_mailalias=new_infos["mail-alias"], + remove_mailalias=remove_alias, + remove_mailforward=remove_forward, + add_mailforward=new_infos["mail-forward"], + from_import=True, + ) for group in add_groups: - if group in ["all_users", new_infos['username']]: + if group in ["all_users", new_infos["username"]]: continue - user_group_update(group, add=new_infos['username'], sync_perm=False, from_import=True) + user_group_update( + group, add=new_infos["username"], sync_perm=False, from_import=True + ) - users = user_list(list(FIELDS_FOR_IMPORT.keys()))['users'] + users = user_list(list(FIELDS_FOR_IMPORT.keys()))["users"] operation_logger.start() # We do delete and update before to avoid mail uniqueness issues - for user in actions['deleted']: + for user in actions["deleted"]: try: user_delete(user, purge=True, from_import=True) - result['deleted'] += 1 + result["deleted"] += 1 except YunohostError as e: on_failure(user, e) progress(f"Deleting {user}") - for user in actions['updated']: + for user in actions["updated"]: try: - update(user, users[user['username']]) - result['updated'] += 1 + update(user, users[user["username"]]) + result["updated"] += 1 except YunohostError as e: - on_failure(user['username'], e) + on_failure(user["username"], e) progress(f"Updating {user['username']}") - for user in actions['created']: + for user in actions["created"]: try: - user_create(user['username'], - user['firstname'], user['lastname'], - user['domain'], user['password'], - user['mailbox-quota'], from_import=True) + user_create( + user["username"], + user["firstname"], + user["lastname"], + user["domain"], + user["password"], + user["mailbox-quota"], + from_import=True, + ) update(user) - result['created'] += 1 + result["created"] += 1 except YunohostError as e: - on_failure(user['username'], e) + on_failure(user["username"], e) progress(f"Creating {user['username']}") permission_sync_to_user() app_ssowatconf() - if result['errors']: - msg = m18n.n('user_import_partial_failed') - if result['created'] + result['updated'] + result['deleted'] == 0: - msg = m18n.n('user_import_failed') + if result["errors"]: + msg = m18n.n("user_import_partial_failed") + if result["created"] + result["updated"] + result["deleted"] == 0: + msg = m18n.n("user_import_failed") logger.error(msg) operation_logger.error(msg) else: - logger.success(m18n.n('user_import_success')) + logger.success(m18n.n("user_import_success")) operation_logger.success() return result @@ -1038,7 +1094,7 @@ def user_group_delete(operation_logger, groupname, force=False, sync_perm=True): logger.debug(m18n.n("group_deleted", group=groupname)) -@is_unit_operation([('groupname', 'group')]) +@is_unit_operation([("groupname", "group")]) def user_group_update( operation_logger, groupname, @@ -1046,7 +1102,7 @@ def user_group_update( remove=None, force=False, sync_perm=True, - from_import=False + from_import=False, ): """ Update user informations