quality: add type hints to user.py

This commit is contained in:
Alexandre Aubin 2024-08-15 01:46:55 +02:00
parent 4ee8d4e8ca
commit 1ba75df0e2
3 changed files with 110 additions and 100 deletions

View file

@ -218,10 +218,6 @@ user:
action_help: List existing groups
api: GET /users/groups
arguments:
-s:
full: --short
help: List only the names of groups
action: store_true
-f:
full: --full
help: Display all informations known about each groups

View file

@ -24,6 +24,7 @@ import random
import subprocess
import copy
from logging import getLogger
from typing import TYPE_CHECKING, Literal, Any, TextIO, Optional, Callable, cast
from moulinette import Moulinette, m18n
from moulinette.utils.process import check_output
@ -33,7 +34,14 @@ from yunohost.service import service_status
from yunohost.log import is_unit_operation
from yunohost.utils.system import binary_to_human
logger = getLogger("yunohost.user")
if TYPE_CHECKING:
from yunohost.log import OperationLogger
from moulinette.utils.log import MoulinetteLogger
from bottle import HTTPResponse as HTTPResponseType
logger = cast(MoulinetteLogger, getLogger("yunohost.user"))
else:
logger = getLogger("yunohost.user")
FIELDS_FOR_IMPORT = {
"username": r"^[a-z0-9_.]+$",
@ -49,8 +57,7 @@ FIELDS_FOR_IMPORT = {
ADMIN_ALIASES = ["root", "admin", "admins", "webmaster", "postmaster", "abuse"]
def user_list(fields=None):
def user_list(fields: Optional[list[str]] = None) -> dict[str, dict[str, Any]] :
from yunohost.utils.ldap import _get_ldap_interface
ldap_attrs = {
@ -71,7 +78,7 @@ def user_list(fields=None):
def display_default(values, _):
return values[0] if len(values) == 1 else values
display = {
display: dict[str, Callable[[list[str], dict], Any]] = {
"password": lambda values, user: "",
"mail": lambda values, user: display_default(values[:1], user),
"mail-alias": lambda values, _: values[1:],
@ -108,15 +115,18 @@ def user_list(fields=None):
)
for user in result:
entry = {}
entry: dict[str, str] = {}
for field in fields:
values = []
if ldap_attrs[field] in user:
values = user[ldap_attrs[field]]
entry[field] = display.get(field, display_default)(values, user)
users[user["uid"][0]] = entry
username: str = user["uid"][0]
users[username] = entry
# Dict entry 0 has incompatible type "str": "dict[Any, dict[str, Any]]";
# expected "str": "dict[str, str]" [dict-item]
return {"users": users}
@ -134,17 +144,17 @@ def shellexists(shell):
@is_unit_operation([("username", "user")])
def user_create(
operation_logger,
username,
domain,
password,
fullname=None,
operation_logger: OperationLogger,
username: str,
domain: str,
password: str,
fullname: str,
mailbox_quota="0",
admin=False,
from_import=False,
admin: bool = False,
from_import: bool = False,
loginShell=None,
):
if not fullname or not fullname.strip():
) -> dict[str, str]:
if not fullname.strip():
raise YunohostValidationError(
"You should specify the fullname of the user using option -F"
)
@ -270,12 +280,12 @@ def user_create(
except subprocess.CalledProcessError:
home = f"/home/{username}"
if not os.path.isdir(home):
logger.warning(m18n.n("user_home_creation_failed", home=home), exc_info=1)
logger.warning(m18n.n("user_home_creation_failed", home=home), exc_info=True)
try:
subprocess.check_call(["setfacl", "-m", "g:all_users:---", f"/home/{username}"])
except subprocess.CalledProcessError:
logger.warning(f"Failed to protect /home/{username}", exc_info=1)
logger.warning(f"Failed to protect /home/{username}", exc_info=True)
# Create group for user and add to group 'all_users'
user_group_create(groupname=username, gid=uid, primary_group=True, sync_perm=False)
@ -302,7 +312,7 @@ def user_create(
@is_unit_operation([("username", "user")])
def user_delete(operation_logger, username, purge=False, from_import=False):
def user_delete(operation_logger: OperationLogger, username: str, purge: bool = False, from_import: bool =False):
from yunohost.hook import hook_callback
from yunohost.utils.ldap import _get_ldap_interface
from yunohost.authenticators.ldap_ynhuser import Authenticator as PortalAuth
@ -353,18 +363,18 @@ def user_delete(operation_logger, username, purge=False, from_import=False):
@is_unit_operation([("username", "user")], exclude=["change_password"])
def user_update(
operation_logger,
username,
mail=None,
change_password=None,
add_mailforward=None,
remove_mailforward=None,
add_mailalias=None,
remove_mailalias=None,
mailbox_quota=None,
from_import=False,
fullname=None,
loginShell=None,
operation_logger: OperationLogger,
username: str,
mail: Optional[str] = None,
change_password: Optional[str] = None,
add_mailforward: None | str | list[str] = None,
remove_mailforward: None | str | list[str] = None,
add_mailalias: None | str | list[str] = None,
remove_mailalias: None | str | list[str] = None,
mailbox_quota: Optional[str] = None,
from_import: bool = False,
fullname: Optional[str] = None,
loginShell: Optional[str] = None,
):
if fullname and fullname.strip():
fullname = fullname.strip()
@ -399,7 +409,7 @@ def user_update(
if not result:
raise YunohostValidationError("user_unknown", user=username)
user = result[0]
env_dict = {"YNH_USER_USERNAME": username}
env_dict: dict[str, str] = {"YNH_USER_USERNAME": username}
# Get modifications from arguments
new_attr_dict = {}
@ -428,9 +438,9 @@ def user_update(
# without a specified value, change_password will be set to the const 0.
# In this case we prompt for the new password.
if Moulinette.interface.type == "cli" and not change_password:
change_password = Moulinette.prompt(
change_password = cast(str, Moulinette.prompt(
m18n.n("ask_password"), is_password=True, confirm=True
)
))
# Ensure compatibility and sufficiently complex password
assert_password_is_compatible(change_password)
@ -462,7 +472,7 @@ def user_update(
new_attr_dict["mail"] = [mail] + user["mail"][1:]
if add_mailalias:
if add_mailalias is not None:
if not isinstance(add_mailalias, list):
add_mailalias = [add_mailalias]
for mail in add_mailalias:
@ -555,7 +565,7 @@ def user_update(
return user_info(username)
def user_info(username):
def user_info(username: str) -> dict[str, str]:
"""
Get user informations
@ -624,8 +634,8 @@ def user_info(username):
has_value = re.search(r"Value=(\d+)", cmd_result)
if has_value:
storage_use = int(has_value.group(1)) * 1000
storage_use = binary_to_human(storage_use)
storage_use_int = int(has_value.group(1)) * 1000
storage_use = binary_to_human(storage_use_int)
if is_limited:
has_percent = re.search(r"%=(\d+)", cmd_result)
@ -642,7 +652,7 @@ def user_info(username):
return result_dict
def user_export():
def user_export() -> str | HTTPResponseType:
"""
Export users into CSV
@ -684,7 +694,7 @@ def user_export():
@is_unit_operation()
def user_import(operation_logger, csvfile, update=False, delete=False):
def user_import(operation_logger: OperationLogger, csvfile: TextIO, update: bool = False, delete: bool = False) -> dict[str, int]:
"""
Import users from CSV
@ -700,7 +710,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
from yunohost.domain import domain_list
# Pre-validate data and prepare what should be done
actions = {"created": [], "updated": [], "deleted": []}
actions: dict[str, list[dict[str, Any]]] = {"created": [], "updated": [], "deleted": []}
is_well_formatted = True
def to_list(str_list):
@ -713,10 +723,11 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
existing_domains = domain_list()["domains"]
reader = csv.DictReader(csvfile, delimiter=";", quotechar='"')
reader_fields = cast(list[str], reader.fieldnames)
users_in_csv = []
missing_columns = [
key for key in FIELDS_FOR_IMPORT.keys() if key not in reader.fieldnames
missing_columns: list[str] = [
key for key in FIELDS_FOR_IMPORT.keys() if key not in reader_fields
]
if missing_columns:
raise YunohostValidationError(
@ -758,7 +769,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
for mail in user["mail-alias"]
if mail.split("@", 1)[1] not in existing_domains
]
unknown_domains = set(unknown_domains)
unknown_domains = list(set(unknown_domains))
if unknown_domains:
format_errors.append(
@ -792,7 +803,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
if delete:
actions["deleted"] = [
user for user in existing_users if user not in users_in_csv
{"username": user} for user in existing_users if user not in users_in_csv
]
if delete and not users_in_csv:
@ -808,7 +819,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
if total == 0:
logger.info(m18n.n("user_import_nothing_to_do"))
return
return {}
# Apply creation, update and deletion operation
result = {"created": 0, "updated": 0, "deleted": 0, "errors": 0}
@ -825,14 +836,14 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
progress.old = bar
logger.info(bar)
progress.nb = 0
progress.old = ""
progress.nb = 0 # type: ignore[attr-defined]
progress.old = "" # type: ignore[attr-defined]
def on_failure(user, exception):
def _on_failure(user, exception):
result["errors"] += 1
logger.error(user + ": " + str(exception))
def update(new_infos, old_infos=False):
def _import_update(new_infos, old_infos=False):
remove_alias = None
remove_forward = None
remove_groups = []
@ -900,18 +911,18 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
# We do delete and update before to avoid mail uniqueness issues
for user in actions["deleted"]:
try:
user_delete(user, purge=True, from_import=True)
user_delete(user["username"], purge=True, from_import=True)
result["deleted"] += 1
except YunohostError as e:
on_failure(user, e)
_on_failure(user, e)
progress(f"Deleting {user}")
for user in actions["updated"]:
try:
update(user, users[user["username"]])
_import_update(user, users[user["username"]])
result["updated"] += 1
except YunohostError as e:
on_failure(user["username"], e)
_on_failure(user["username"], e)
progress(f"Updating {user['username']}")
for user in actions["created"]:
@ -924,10 +935,10 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
from_import=True,
fullname=(user["firstname"] + " " + user["lastname"]).strip(),
)
update(user)
_import_update(user)
result["created"] += 1
except YunohostError as e:
on_failure(user["username"], e)
_on_failure(user["username"], e)
progress(f"Creating {user['username']}")
permission_sync_to_user()
@ -948,12 +959,11 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
#
# Group subcategory
#
def user_group_list(short=False, full=False, include_primary_groups=True):
def user_group_list(full: bool = False, include_primary_groups: bool = True) -> dict[str, dict[str, dict]]:
"""
List users
Keyword argument:
short -- Only list the name of the groups without any additional info
full -- List all the info available for each groups
include_primary_groups -- Include groups corresponding to users (which should always only contains this user)
This option is set to false by default in the action map because we don't want to have
@ -975,7 +985,7 @@ def user_group_list(short=False, full=False, include_primary_groups=True):
# Parse / organize information to be outputed
users = user_list()["users"]
groups = {}
groups: dict[str, dict[str, Any]] = {}
for infos in groups_infos:
name = infos["cn"][0]
@ -993,16 +1003,13 @@ def user_group_list(short=False, full=False, include_primary_groups=True):
_ldap_path_extract(p, "cn") for p in infos.get("permission", [])
]
if short:
groups = list(groups.keys())
return {"groups": groups}
@is_unit_operation([("groupname", "group")])
def user_group_create(
operation_logger, groupname, gid=None, primary_group=False, sync_perm=True
):
operation_logger: OperationLogger, groupname: str, gid: Optional[int] = None, primary_group: bool = False, sync_perm: bool = True
) -> dict[str, str]:
"""
Create group
@ -1041,7 +1048,7 @@ def user_group_create(
uid_guid_found = False
while not uid_guid_found:
gid = str(random.randint(200, 99999))
gid = random.randint(200, 99999)
uid_guid_found = gid not in all_gid
attr_dict = {
@ -1074,7 +1081,7 @@ def user_group_create(
@is_unit_operation([("groupname", "group")])
def user_group_delete(operation_logger, groupname, force=False, sync_perm=True):
def user_group_delete(operation_logger: OperationLogger, groupname: str, force: bool = False, sync_perm: bool = True) -> None:
"""
Delete user
@ -1116,16 +1123,16 @@ def user_group_delete(operation_logger, groupname, force=False, sync_perm=True):
@is_unit_operation([("groupname", "group")])
def user_group_update(
operation_logger,
groupname,
add=None,
remove=None,
add_mailalias=None,
remove_mailalias=None,
force=False,
sync_perm=True,
from_import=False,
):
operation_logger: OperationLogger,
groupname: str,
add: None | str | list[str] = None,
remove: None | str | list[str] = None,
add_mailalias: None | str | list[str] = None,
remove_mailalias: None | str | list[str] = None,
force: bool = False,
sync_perm: bool = True,
from_import: bool = False,
) -> None | dict[str, Any]:
from yunohost.permission import permission_sync_to_user
from yunohost.utils.ldap import _get_ldap_interface, _ldap_path_extract
@ -1165,7 +1172,7 @@ def user_group_update(
_ldap_path_extract(p, "uid") for p in group.get("member", [])
]
new_group_members = copy.copy(current_group_members)
new_attr_dict = {}
new_attr_dict: dict[str, list] = {}
if add:
users_to_add = [add] if not isinstance(add, list) else add
@ -1205,8 +1212,8 @@ def user_group_update(
new_group_members_dns = [
"uid=" + user + ",ou=users,dc=yunohost,dc=org" for user in new_group_members
]
new_attr_dict["member"] = set(new_group_members_dns)
new_attr_dict["memberUid"] = set(new_group_members)
new_attr_dict["member"] = list(set(new_group_members_dns))
new_attr_dict["memberUid"] = list(set(new_group_members))
# Check the whole alias situation
if add_mailalias:
@ -1258,7 +1265,7 @@ def user_group_update(
if set(new_group_mail) != set(current_group_mail):
logger.info(m18n.n("group_update_aliases", group=groupname))
new_attr_dict["mail"] = set(new_group_mail)
new_attr_dict["mail"] = list(set(new_group_mail))
if new_attr_dict["mail"] and "mailGroup" not in group["objectClass"]:
new_attr_dict["objectClass"] = group["objectClass"] + ["mailGroup"]
@ -1296,8 +1303,10 @@ def user_group_update(
return user_group_info(groupname)
return None
def user_group_info(groupname):
def user_group_info(groupname: str) -> dict[str, Any]:
"""
Get user informations
@ -1333,7 +1342,7 @@ def user_group_info(groupname):
}
def user_group_add(groupname, usernames, force=False, sync_perm=True):
def user_group_add(groupname: str, usernames: list[str], force: bool = False, sync_perm: bool = True) -> Optional[dict[str, Any]]:
"""
Add user(s) to a group
@ -1345,7 +1354,7 @@ def user_group_add(groupname, usernames, force=False, sync_perm=True):
return user_group_update(groupname, add=usernames, force=force, sync_perm=sync_perm)
def user_group_remove(groupname, usernames, force=False, sync_perm=True):
def user_group_remove(groupname: str, usernames: list[str], force: bool = False, sync_perm: bool = True) -> Optional[dict[str, Any]]:
"""
Remove user(s) from a group
@ -1359,11 +1368,11 @@ def user_group_remove(groupname, usernames, force=False, sync_perm=True):
)
def user_group_add_mailalias(groupname, aliases):
def user_group_add_mailalias(groupname: str, aliases: list[str]) -> Optional[dict[str, Any]]:
return user_group_update(groupname, add_mailalias=aliases, sync_perm=False)
def user_group_remove_mailalias(groupname, aliases):
def user_group_remove_mailalias(groupname: str, aliases: list[str]) -> Optional[dict[str, Any]]:
return user_group_update(groupname, remove_mailalias=aliases, sync_perm=False)
@ -1371,14 +1380,15 @@ def user_group_remove_mailalias(groupname, aliases):
# Permission subcategory
#
def user_permission_list(short=False, full=False, apps=[]):
# FIXME: missing return type
def user_permission_list(short: bool = False, full: bool = False, apps: list[str] = []):
from yunohost.permission import user_permission_list
return user_permission_list(short, full, absolute_urls=True, apps=apps)
def user_permission_update(permission, label=None, show_tile=None, sync_perm=True):
# FIXME: missing return type
def user_permission_update(permission: str, label: Optional[str] = None, show_tile: Optional[bool] = None, sync_perm: bool = True):
from yunohost.permission import user_permission_update
return user_permission_update(
@ -1386,7 +1396,8 @@ def user_permission_update(permission, label=None, show_tile=None, sync_perm=Tru
)
def user_permission_add(permission, names, protected=None, force=False, sync_perm=True):
# FIXME: missing return type
def user_permission_add(permission: str, names: list[str], protected: Optional[bool] = None, force: bool = False, sync_perm: bool = True):
from yunohost.permission import user_permission_update
return user_permission_update(
@ -1394,8 +1405,9 @@ def user_permission_add(permission, names, protected=None, force=False, sync_per
)
# FIXME: missing return type
def user_permission_remove(
permission, names, protected=None, force=False, sync_perm=True
permission: str, names: list[str], protected: Optional[bool] = None, force: bool = False, sync_perm: bool = True
):
from yunohost.permission import user_permission_update
@ -1404,13 +1416,15 @@ def user_permission_remove(
)
def user_permission_reset(permission, sync_perm=True):
# FIXME: missing return type
def user_permission_reset(permission: str, sync_perm: bool = True):
from yunohost.permission import user_permission_reset
return user_permission_reset(permission, sync_perm=sync_perm)
def user_permission_info(permission):
# FIXME: missing return type
def user_permission_info(permission: str):
from yunohost.permission import user_permission_info
return user_permission_info(permission)
@ -1422,15 +1436,15 @@ def user_permission_info(permission):
import yunohost.ssh
def user_ssh_list_keys(username):
def user_ssh_list_keys(username: str) -> dict[str, dict[str, str]]:
return yunohost.ssh.user_ssh_list_keys(username)
def user_ssh_add_key(username, key, comment):
def user_ssh_add_key(username: str, key: str, comment: Optional[str] = None) -> None:
return yunohost.ssh.user_ssh_add_key(username, key, comment)
def user_ssh_remove_key(username, key):
def user_ssh_remove_key(username: str, key: str) -> None:
return yunohost.ssh.user_ssh_remove_key(username, key)
@ -1438,7 +1452,7 @@ def user_ssh_remove_key(username, key):
# End SSH subcategory
#
def _update_admins_group_aliases(old_main_domain, new_main_domain):
def _update_admins_group_aliases(old_main_domain: str, new_main_domain: str) -> None:
current_admin_aliases = user_group_info("admins")["mail-aliases"]
aliases_to_remove = [

View file

@ -1783,7 +1783,7 @@ class GroupOption(BaseChoicesOption):
# TODO remove calls to resources in validators (pydantic V2 should adress this)
from yunohost.user import user_group_list
groups = user_group_list(short=True, include_primary_groups=False)["groups"]
groups = list(user_group_list(include_primary_groups=False)["groups"].keys())
def _human_readable_group(groupname):
# i18n: visitors