portalapi/sso: add a first bunch of unit tests

This commit is contained in:
Alexandre Aubin 2023-12-27 02:31:06 +01:00
parent 29cac1791d
commit 7f02fcd985
4 changed files with 218 additions and 16 deletions

View file

@ -187,3 +187,8 @@ test-ldapauth:
# only:
# changes:
# - src/authenticators/*.py
test-sso-and-portalapi:
extends: .test-stage
script:
- python3 -m pytest src/tests/test_sso_and_portalapi.py

View file

@ -22,8 +22,8 @@ import logging
import ldap
import ldap.sasl
import time
import glob
import hashlib
from pathlib import Path
from moulinette import m18n
from moulinette.authentication import BaseAuthenticator
@ -212,21 +212,18 @@ class Authenticator(BaseAuthenticator):
def purge_expired_session_files(self):
for session_id in os.listdir(SESSION_FOLDER):
session_file = f"{SESSION_FOLDER}/{session_id}"
if abs(os.path.getctime(session_file) - time.time()) > SESSION_VALIDITY:
for session_file in Path(SESSION_FOLDER).iterdir():
if abs(session_file.stat().st_mtime - time.time()) > SESSION_VALIDITY:
try:
os.remove(session_file)
session_file.unlink()
except Exception as e:
logger.debug(f"Failed to delete session file {session_file} ? {e}")
@staticmethod
def invalidate_all_sessions_for_user(user):
for path in glob.glob(f"{SESSION_FOLDER}/{short_hash(user)}*"):
try:
logger.info(path)
os.remove(path)
except Exception as e:
logger.debug(f"Failed to delete session file {path} ? {e}")
logger.info(str(glob.glob(f"{SESSION_FOLDER}/*")))
for file in Path(SESSION_FOLDER).glob(f"{short_hash(user)}*"):
try:
file.unlink()
except Exception as e:
logger.debug(f"Failed to delete session file {file} ? {e}")

View file

@ -58,13 +58,13 @@ def user_is_allowed_on_domain(user: str, domain: str) -> bool:
parent_domain = domain.split(".", 1)[-1]
return user_is_allowed_on_domain(user, parent_domain)
ctime = portal_settings_path.stat().st_ctime
if domain not in DOMAIN_USER_ACL_DICT or DOMAIN_USER_ACL_DICT[domain]["ctime"] < time.time():
mtime = portal_settings_path.stat().st_mtime
if domain not in DOMAIN_USER_ACL_DICT or DOMAIN_USER_ACL_DICT[domain]["mtime"] < time.time():
users: set[str] = set()
for infos in read_json(str(portal_settings_path))["apps"].values():
users = users.union(infos["users"])
DOMAIN_USER_ACL_DICT[domain] = {}
DOMAIN_USER_ACL_DICT[domain]["ctime"] = ctime
DOMAIN_USER_ACL_DICT[domain]["mtime"] = mtime
DOMAIN_USER_ACL_DICT[domain]["users"] = users
if user in DOMAIN_USER_ACL_DICT[domain]["users"]:
@ -256,7 +256,8 @@ class Authenticator(BaseAuthenticator):
def purge_expired_session_files(self):
for session_file in Path(SESSION_FOLDER).iterdir():
if abs(session_file.stat().st_ctime - time.time()) > SESSION_VALIDITY:
print(session_file.stat().st_mtime - time.time())
if abs(session_file.stat().st_mtime - time.time()) > SESSION_VALIDITY:
try:
session_file.unlink()
except Exception as e:

View file

@ -0,0 +1,199 @@
import time
import requests
from pathlib import Path
import os
from .conftest import message, raiseYunohostError, get_test_apps_dir
from yunohost.domain import _get_maindomain, domain_add, domain_remove, domain_list
from yunohost.user import user_create, user_list, user_delete
from yunohost.authenticators.ldap_ynhuser import Authenticator, SESSION_FOLDER, short_hash
# Get main domain
maindomain = open("/etc/yunohost/current_host").read().strip()
dummy_password = "test123Ynh"
def setup_function(function):
Authenticator.invalidate_all_sessions_for_user("alice")
assert number_of_active_session_for_user("alice") == 0
def teardown_function(function):
pass
def setup_module(module):
assert os.system("systemctl is-active yunohost-portal-api >/dev/null") == 0
user_create("alice", maindomain, dummy_password, fullname="Alice White", admin=True)
def teardown_module(module):
if "alice" in user_list()["users"]:
user_delete("alice")
def login(session, logged_as):
login_endpoint = f"https://{maindomain}/yunohost/portalapi/login"
r = session.post(
login_endpoint,
data={"credentials": f"{logged_as}:{dummy_password}"},
headers={
"X-Requested-With": "",
},
verify=False,
)
return r
def logout(session):
logout_endpoint = f"https://{maindomain}/yunohost/portalapi/logout"
r = session.get(
logout_endpoint,
headers={
"X-Requested-With": "",
},
verify=False,
)
return r
def number_of_active_session_for_user(user):
return len(list(Path(SESSION_FOLDER).glob(f"{short_hash('alice')}*")))
def request(webpath, logged_as=None, session=None):
webpath = webpath.rstrip("/")
# Anonymous access
if session:
r = session.get(webpath, verify=False)
elif not logged_as:
r = requests.get(webpath, verify=False)
# Login as a user using dummy password
else:
with requests.Session() as session:
r = login(session, logged_as)
# We should have some cookies related to authentication now
assert session.cookies
r = session.get(webpath, verify=False)
# If we can't access it, we got redirected to the SSO
# with `r=<base64_callback_url>` for anonymous access because they're encouraged to log-in,
# and `msg=access_denied` if we are logged but not allowed for this url
# with `r=
#sso_url = f"https://{maindomain}/yunohost/sso/"
#if not logged_as:
# sso_url += "?r="
#else:
# sso_url += "?msg=access_denied"
return r
def test_api_public_as_anonymous():
r = request(f"https://{maindomain}/yunohost/portalapi/public")
assert r.status_code == 200 and "apps" in r.json()
def test_api_me_as_anonymous():
r = request(f"https://{maindomain}/yunohost/portalapi/me")
assert r.status_code == 401
def test_api_login_and_logout():
with requests.Session() as session:
r = login(session, "alice")
assert "yunohost.portal" in session.cookies
assert r.status_code == 200
assert number_of_active_session_for_user("alice") == 1
r = logout(session)
assert number_of_active_session_for_user("alice") == 0
def test_api_login_nonexistinguser():
with requests.Session() as session:
r = login(session, "nonexistent")
assert r.status_code == 401
def test_api_public_and_me_logged_in():
r = request(f"https://{maindomain}/yunohost/portalapi/public", logged_as="alice")
assert r.status_code == 200 and "apps" in r.json()
r = request(f"https://{maindomain}/yunohost/portalapi/me", logged_as="alice")
assert r.status_code == 200 and r.json()["username"] == "alice"
assert number_of_active_session_for_user("alice") == 2
def test_api_session_expired():
with requests.Session() as session:
r = login(session, "alice")
assert "yunohost.portal" in session.cookies
assert r.status_code == 200
r = request(f"https://{maindomain}/yunohost/portalapi/me", session=session)
assert r.status_code == 200 and r.json()["username"] == "alice"
for file in Path(SESSION_FOLDER).glob(f"{short_hash('alice')}*"):
os.utime(str(file), (0, 0))
r = request(f"https://{maindomain}/yunohost/portalapi/me", session=session)
assert number_of_active_session_for_user("alice") == 0
assert r.status_code == 401
def test_public_routes_not_blocked_by_ssowat():
r = request(f"https://{maindomain}/yunohost/api/whatever")
# Getting code 405, Method not allowed, which means the API does answer,
# meaning it's not blocked by ssowat
assert r.status_code == 405
Path("/var/www/.well-known/acme-challenge-public/toto").touch()
r = request(f"http://{maindomain}/.well-known/acme-challenge/toto")
assert r.status_code == 200
r = request(f"http://{maindomain}/.well-known/acme-challenge/nonexistent")
assert r.status_code == 404
# app privée pour alice
# - pas d'accès si pas loggué
# -> redirection ?
# - accès si loggué si alice
# - pas d'accès même si loggué en tant que bob
# accès à l'api portal
# -> test des routes
# apps publique (seulement si activé ?)
# /me
# /update
# accès à une url autorisée mais qui 502 ?
# dummy app qui montre le header remote_user / authentication ?
# accès aux trucs précédent meme avec une app installée sur la racine ?
# ou une app par défaut ?
# accès à un deuxième "domain principal"
# accès à un app sur un sous-domaine
# pas loggué -> redirect vers sso sur domaine principal
# se logger sur API sur domain principal, puis utilisation du cookie sur le sous-domaine