From ebdb1e22eec02656d6f43ae3ffc97e1384cd0b75 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 9 Jul 2021 20:42:11 +0200 Subject: [PATCH] Further attempt to simplify authentication management code + get rid of msignals madness --- locales/en.json | 1 - moulinette/__init__.py | 7 ++ moulinette/actionsmap.py | 23 ++-- moulinette/authentication.py | 112 +------------------ moulinette/interfaces/__init__.py | 6 +- moulinette/interfaces/api.py | 178 +++++++++++------------------- moulinette/interfaces/cli.py | 2 +- test/actionsmap/moulitest.yml | 1 + test/test_actionsmap.py | 66 +++++++---- test/test_auth.py | 35 ++---- 10 files changed, 144 insertions(+), 287 deletions(-) diff --git a/locales/en.json b/locales/en.json index 5522d16c..75476f89 100644 --- a/locales/en.json +++ b/locales/en.json @@ -1,7 +1,6 @@ { "argument_required": "Argument '{argument}' is required", "authentication_required": "Authentication required", - "authentication_required_long": "Authentication is required to perform this action", "colon": "{}: ", "confirm": "Confirm {prompt}", "deprecated_command": "'{prog} {command}' is deprecated and will be removed in the future", diff --git a/moulinette/__init__.py b/moulinette/__init__.py index 16e62392..c4e9913d 100755 --- a/moulinette/__init__.py +++ b/moulinette/__init__.py @@ -40,6 +40,13 @@ msettings = dict() m18n = Moulinette18n() +def prompt(**kwargs): + return msettings["interface"].prompt(**kwargs) + + +def display(**kwargs): + return msettings["interface"].display(**kwargs) + # Package functions diff --git a/moulinette/actionsmap.py b/moulinette/actionsmap.py index d5952dc0..419db221 100644 --- a/moulinette/actionsmap.py +++ b/moulinette/actionsmap.py @@ -20,7 +20,7 @@ from moulinette.core import ( MoulinetteAuthenticationError, MoulinetteValidationError, ) -from moulinette.interfaces import BaseActionsMapParser, GLOBAL_SECTION, TO_RETURN_PROP +from moulinette.interfaces import BaseActionsMapParser, TO_RETURN_PROP from moulinette.utils.log import start_action_logging logger = logging.getLogger("moulinette.actionsmap") @@ -42,7 +42,6 @@ class _ExtraParameter(object): """ def __init__(self, iface): - # TODO: Add conn argument which contains authentification object self.iface = iface # Required variables @@ -98,7 +97,7 @@ class CommentParameter(_ExtraParameter): def __call__(self, message, arg_name, arg_value): if arg_value is None: return - return self.iface.display(m18n.n(message)) + return msettings['interface'].display(m18n.n(message)) @classmethod def validate(klass, value, arg_name): @@ -135,7 +134,7 @@ class AskParameter(_ExtraParameter): try: # Ask for the argument value - return self.iface.prompt(m18n.n(message)) + return msettings['interface'].prompt(m18n.n(message)) except NotImplementedError: return arg_value @@ -173,7 +172,7 @@ class PasswordParameter(AskParameter): try: # Ask for the password - return self.iface.prompt(m18n.n(message), True, True) + return msettings['interface'].prompt(m18n.n(message), True, True) except NotImplementedError: return arg_value @@ -284,7 +283,7 @@ class ExtraArgumentParser(object): def __init__(self, iface): self.iface = iface self.extra = OrderedDict() - self._extra_params = {GLOBAL_SECTION: {}} + self._extra_params = {"_global": {}} # Append available extra parameters for the current interface for klass in extraparameters_list: @@ -326,7 +325,7 @@ class ExtraArgumentParser(object): Add extra parameters to apply on an action argument Keyword arguments: - - tid -- The tuple identifier of the action or GLOBAL_SECTION + - tid -- The tuple identifier of the action or _global for global extra parameters - arg_name -- The argument name - parameters -- A dict of extra parameters with their values @@ -349,7 +348,7 @@ class ExtraArgumentParser(object): - args -- A dict of argument name associated to their value """ - extra_args = OrderedDict(self._extra_params.get(GLOBAL_SECTION, {})) + extra_args = OrderedDict(self._extra_params.get("_global", {})) extra_args.update(self._extra_params.get(tid, {})) # Iterate over action arguments with extra parameters @@ -492,16 +491,15 @@ class ActionsMap(object): else: return mod.Authenticator() - def check_authentication_if_required(self, args, **kwargs): + def check_authentication_if_required(self, *args, **kwargs): - auth_method = self.parser.auth_method(args, **kwargs) + auth_method = self.parser.auth_method(*args, **kwargs) if auth_method is None: return authenticator = self.get_authenticator(auth_method) - if not msettings['interface'].authenticate(authenticator): - raise MoulinetteAuthenticationError("authentication_required_long") + msettings['interface'].authenticate(authenticator) def process(self, args, timeout=None, **kwargs): """ @@ -707,6 +705,7 @@ class ActionsMap(object): ) else: self.main_namespace = namespace + self.name = _global["name"] self.default_authentication = _global["authentication"][ interface_type ] diff --git a/moulinette/authentication.py b/moulinette/authentication.py index a14a0b8a..af8246e6 100644 --- a/moulinette/authentication.py +++ b/moulinette/authentication.py @@ -37,119 +37,11 @@ class BaseAuthenticator(object): try: # Attempt to authenticate - self._authenticate_credentials(credentials) + auth_info = self._authenticate_credentials(credentials) or {} except MoulinetteError: raise except Exception as e: logger.exception(f"authentication {self.name} failed because '{e}'") raise MoulinetteAuthenticationError("unable_authenticate") - # Store session for later using the provided (new) token if any - if store_session: - try: - s_id = random_ascii() - s_token = random_ascii() - self._store_session(s_id, s_token) - except Exception as e: - import traceback - - traceback.print_exc() - logger.exception(f"unable to store session because {e}") - else: - logger.debug("session has been stored") - - def _authenticate_credentials(self, credentials=None): - """Attempt to authenticate - - Attempt to authenticate with given credentials. It should raise an - AuthenticationError exception if authentication fails. - - Keyword arguments: - - credentials -- A string containing the credentials to be used by the authenticator - - """ - raise NotImplementedError( - "derived class '%s' must override this method" % self.__class__.__name__ - ) - - def _open_sessionfile(self, session_id, mode="r"): - """Open a session file for this instance in given mode""" - return open_cachefile( - "%s.asc" % session_id, mode, subdir="session/%s" % self.name - ) - - def _session_exists(self, session_id): - """Check a session exists""" - return cachefile_exists("%s.asc" % session_id, subdir="session/%s" % self.name) - - def _store_session(self, session_id, session_token): - """Store a session to be able to use it later to reauthenticate""" - - # We store a hash of the session_id and the session_token (the token is assumed to be secret) - to_hash = "{id}:{token}".format(id=session_id, token=session_token).encode() - hash_ = hashlib.sha256(to_hash).hexdigest() - with self._open_sessionfile(session_id, "w") as f: - f.write(hash_) - - def authenticate_session(self, s_id, s_token): - try: - # Attempt to authenticate - self._authenticate_session(s_id, s_token) - except MoulinetteError: - raise - except Exception as e: - logger.exception(f"authentication {self.name} failed because '{e}'") - raise MoulinetteAuthenticationError("unable_authenticate") - - def _authenticate_session(self, session_id, session_token): - """Checks session and token against the stored session token""" - if not self._session_exists(session_id): - raise MoulinetteAuthenticationError("session_expired") - try: - # FIXME : shouldn't we also add a check that this session file - # is not too old ? e.g. not older than 24 hours ? idk... - - with self._open_sessionfile(session_id, "r") as f: - stored_hash = f.read() - except IOError as e: - logger.debug("unable to retrieve session", exc_info=1) - raise MoulinetteAuthenticationError("unable_retrieve_session", exception=e) - else: - # - # session_id (or just id) : This is unique id for the current session from the user. Not too important - # if this info gets stolen somehow. It is stored in the client's side (browser) using regular cookies. - # - # session_token (or just token) : This is a secret info, like some sort of ephemeral password, - # used to authenticate the session without the user having to retype the password all the time... - # - It is generated on our side during the initial auth of the user (which happens with the actual admin password) - # - It is stored on the client's side (browser) using (signed) cookies. - # - We also store it on our side in the form of a hash of {id}:{token} (c.f. _store_session). - # We could simply store the raw token, but hashing it is an additonal low-cost security layer - # in case this info gets exposed for some reason (e.g. bad file perms for reasons...) - # - # When the user comes back, we fetch the session_id and session_token from its cookies. Then we - # re-hash the {id}:{token} and compare it to the previously stored hash for this session_id ... - # It it matches, then the user is authenticated. Otherwise, the token is invalid. - # - to_hash = "{id}:{token}".format(id=session_id, token=session_token).encode() - hash_ = hashlib.sha256(to_hash).hexdigest() - - if not hmac.compare_digest(hash_, stored_hash): - raise MoulinetteAuthenticationError("invalid_token") - else: - return - - def _clean_session(self, session_id): - """Clean a session cache - - Remove cache for the session 'session_id' and for this authenticator profile - - Keyword arguments: - - session_id -- The session id to clean - """ - sessiondir = get_cachedir("session") - - try: - os.remove(os.path.join(sessiondir, self.name, "%s.asc" % session_id)) - except OSError: - pass + return auth_info diff --git a/moulinette/interfaces/__init__.py b/moulinette/interfaces/__init__.py index b5385ded..5d8821cb 100644 --- a/moulinette/interfaces/__init__.py +++ b/moulinette/interfaces/__init__.py @@ -11,7 +11,7 @@ from moulinette.core import MoulinetteError logger = logging.getLogger("moulinette.interface") -GLOBAL_SECTION = "_global" +# FIXME : are these even used for anything useful ... TO_RETURN_PROP = "_to_return" CALLBACKS_PROP = "_callbacks" @@ -114,7 +114,7 @@ class BaseActionsMapParser(object): "derived class '%s' must override this method" % self.__class__.__name__ ) - def auth_method(self, args, **kwargs): + def auth_method(self, *args, **kwargs): """Check if authentication is required to run the requested action Keyword arguments: @@ -156,7 +156,7 @@ class BaseActionsMapParser(object): ): raise MoulinetteError("invalid_usage") elif not tid: - tid = GLOBAL_SECTION + tid = "_global" # Prepare namespace if namespace is None: diff --git a/moulinette/interfaces/api.py b/moulinette/interfaces/api.py index 818cdb2a..55fe66d4 100644 --- a/moulinette/interfaces/api.py +++ b/moulinette/interfaces/api.py @@ -61,7 +61,7 @@ def filter_csrf(callback): class LogQueues(dict): - """Map of session id to queue.""" + """Map of session ids to queue.""" pass @@ -78,9 +78,10 @@ class APIQueueHandler(logging.Handler): self.queues = LogQueues() def emit(self, record): + s_id = Session.get_infos()["id"] sid = request.get_cookie("session.id") try: - queue = self.queues[sid] + queue = self.queues[s_id] except KeyError: # Session is not initialized, abandon. return @@ -209,6 +210,34 @@ class _HTTPArgumentParser(object): raise MoulinetteValidationError(message, raw_msg=True) +class Session(): + + secret = random_ascii() + actionsmap_name = None # This is later set to the actionsmap name + + def set_infos(infos): + + assert isinstance(infos, dict) + + response.set_cookie(f"session.{Session.actionsmap_name}", infos, secure=True, secret=Session.secret) + + def get_infos(): + + try: + infos = request.get_cookie(f"session.{Session.actionsmap_name}", secret=Session.secret, default={}) + except Exception: + infos = {} + + if "id" not in infos: + infos["id"] = random_ascii() + + return infos + + def delete_infos(): + + response.set_cookie(f"session.{Session.actionsmap_name}", "", max_age=-1) + + class _ActionsMapPlugin(object): """Actions map Bottle Plugin @@ -228,8 +257,7 @@ class _ActionsMapPlugin(object): self.actionsmap = actionsmap self.log_queues = log_queues - # TODO: Save and load secrets? - self.secrets = {} + Session.actionsmap_name = actionsmap.name def setup(self, app): """Setup plugin on the application @@ -240,36 +268,6 @@ class _ActionsMapPlugin(object): - app -- The application instance """ - # Login wrapper - def _login(callback): - def wrapper(): - kwargs = {} - try: - kwargs["credentials"] = request.POST.credentials - except KeyError: - raise HTTPResponse("Missing credentials parameter", 400) - - # Apparently even if the key doesn't exists, request.POST.foobar just returns empty string... - if not kwargs["credentials"]: - raise HTTPResponse("Missing credentials parameter", 400) - - kwargs["profile"] = request.POST.get( - "profile", self.actionsmap.default_authentication - ) - return callback(**kwargs) - - return wrapper - - # Logout wrapper - def _logout(callback): - def wrapper(): - kwargs = {} - kwargs["profile"] = request.POST.get( - "profile", self.actionsmap.default_authentication - ) - return callback(**kwargs) - - return wrapper # Append authentication routes app.route( @@ -278,7 +276,6 @@ class _ActionsMapPlugin(object): method="POST", callback=self.login, skip=["actionsmap"], - apply=_login, ) app.route( "/logout", @@ -286,7 +283,6 @@ class _ActionsMapPlugin(object): method="GET", callback=self.logout, skip=["actionsmap"], - apply=_logout, ) # Append messages route @@ -347,106 +343,61 @@ class _ActionsMapPlugin(object): # Routes callbacks - def login(self, credentials, profile): - """Log in to an authenticator profile + def login(self): + """Log in to an authenticator - Attempt to authenticate to a given authenticator profile and + Attempt to authenticate to the default authenticator and register it with the current session - a new one will be created if needed. - Keyword arguments: - - credentials -- Some credentials to use for login - - profile -- The authenticator profile name to log in - """ + + credentials = request.POST.credentials + # Apparently even if the key doesn't exists, request.POST.foobar just returns empty string... + if not credentials: + raise HTTPResponse("Missing credentials parameter", 400) + + profile = request.POST.profile + if not profile: + profile = self.actionsmap.default_authentication + authenticator = self.actionsmap.get_authenticator(profile) try: - s_id, s_token = authenticator.authenticate_credentials(credentials, store_session=True) + auth_info = authenticator.authenticate_credentials(credentials, store_session=True) + session_infos = Session.get_infos() + session_infos[profile] = auth_info except MoulinetteError as e: try: - self.logout(profile) + self.logout() except Exception: pass # FIXME : replace with MoulinetteAuthenticationError !? raise HTTPResponse(e.strerror, 401) else: - # Save session id and token - - # Create and save (in RAM) new cookie secret used to secure(=sign?) the cookie - self.secrets[s_id] = s_secret = random_ascii() - - # Fetch current token per profile - try: - s_tokens = request.get_cookie("session.tokens", secret=s_secret) or {} - except Exception: - # Same as for session.id a few lines before - s_tokens = {} - - # Update dicts with new values - s_tokens[profile] = s_token - - response.set_cookie("session.id", s_id, secure=True) - response.set_cookie( - "session.tokens", {""}, secure=True, secret=s_secret - ) + Session.set_infos(session_infos) return m18n.g("logged_in") # This is called before each time a route is going to be processed def authenticate(self, authenticator): - s_id = request.get_cookie("session.id") try: - s_secret = self.secrets[s_id] - s_token = request.get_cookie("session.tokens", secret=s_secret, default={})[ - authenticator.name - ] + session_infos = Session.get_infos()[authenticator.name] except KeyError: msg = m18n.g("authentication_required") raise HTTPResponse(msg, 401) - else: - authenticator.authenticate_session(s_id, s_token) - def logout(self, profile): - """Log out from an authenticator profile + return session_infos - Attempt to unregister a given profile - or all by default - from - the current session. - - Keyword arguments: - - profile -- The authenticator profile name to log out - - """ - # Retrieve session values + def logout(self): try: - s_id = request.get_cookie("session.id") or None - except: - # Super rare case where there are super weird cookie / cache issue - # Previous line throws a CookieError that creates a 500 error ... - # So let's catch it and just use None... - s_id = None - - if s_id is not None: - - # We check that there's a (signed) session.hash available - # for additional security ? - # (An attacker could not craft such signed hashed ? (FIXME : need to make sure of this)) - try: - s_secret = self.secrets[s_id] - except KeyError: - s_secret = {} - if profile not in request.get_cookie( - "session.tokens", secret=s_secret, default={} - ): - raise HTTPResponse(m18n.g("not_logged_in"), 401) - else: - del self.secrets[s_id] - authenticator = self.actionsmap.get_authenticator(profile) - authenticator._clean_session(s_id) - # TODO: Clean the session for profile only - # Delete cookie and clean the session - response.set_cookie("session.tokens", "", max_age=-1) - return m18n.g("logged_out") + Session.get_infos() + except KeyError: + raise HTTPResponse(m18n.g("not_logged_in"), 401) + else: + # Delete cookie and clean the session + Session.delete_infos() + return m18n.g("logged_out") def messages(self): """Listen to the messages WebSocket stream @@ -454,7 +405,7 @@ class _ActionsMapPlugin(object): Retrieve the WebSocket stream and send to it each messages displayed by the display method. They are JSON encoded as a dict { style: message }. """ - s_id = request.get_cookie("session.id") + s_id = Session.get_infos()["id"] try: queue = self.log_queues[s_id] except KeyError: @@ -515,7 +466,8 @@ class _ActionsMapPlugin(object): finally: # Close opened WebSocket by putting StopIteration in the queue try: - queue = self.log_queues[request.get_cookie("session.id")] + s_id = Session.get_infos()["id"] + queue = self.log_queues[s_id] except KeyError: pass else: @@ -523,7 +475,7 @@ class _ActionsMapPlugin(object): def display(self, message, style): - s_id = request.get_cookie("session.id") + s_id = Sesson.get_infos()["id"] try: queue = self.log_queues[s_id] except KeyError: @@ -659,7 +611,7 @@ class ActionsMapParser(BaseActionsMapParser): # Return the created parser return parser - def auth_method(self, args, route, **kwargs): + def auth_method(self, _, route): try: # Retrieve the tid for the route diff --git a/moulinette/interfaces/cli.py b/moulinette/interfaces/cli.py index 07c3d661..84a204eb 100644 --- a/moulinette/interfaces/cli.py +++ b/moulinette/interfaces/cli.py @@ -397,7 +397,7 @@ class ActionsMapParser(BaseActionsMapParser): self.global_parser.add_argument(*names, **argument_options) - def auth_method(self, args, **kwargs): + def auth_method(self, args): # FIXME? idk .. this try/except is duplicated from parse_args below # Just to be able to obtain the tid try: diff --git a/test/actionsmap/moulitest.yml b/test/actionsmap/moulitest.yml index c59c250b..4a51e48d 100644 --- a/test/actionsmap/moulitest.yml +++ b/test/actionsmap/moulitest.yml @@ -3,6 +3,7 @@ # Global parameters # ############################# _global: + name: moulitest authentication: api: dummy cli: dummy diff --git a/test/test_actionsmap.py b/test/test_actionsmap.py index 57348fc5..259fc78b 100644 --- a/test/test_actionsmap.py +++ b/test/test_actionsmap.py @@ -10,9 +10,8 @@ from moulinette.actionsmap import ( ActionsMap, ) -from moulinette.interfaces import GLOBAL_SECTION from moulinette.core import MoulinetteError -from moulinette import m18n +from moulinette import m18n, msettings @pytest.fixture @@ -74,6 +73,7 @@ def test_ask_parameter(iface, mocker): from moulinette.core import Moulinette18n + msettings["interface"] = iface mocker.patch.object(Moulinette18n, "n", return_value="awesome_test") mocker.patch.object(iface, "prompt", return_value="awesome_test") arg = ask("foobar", "a", None) @@ -87,6 +87,7 @@ def test_password_parameter(iface, mocker): from moulinette.core import Moulinette18n + msettings["interface"] = iface mocker.patch.object(Moulinette18n, "n", return_value="awesome_test") mocker.patch.object(iface, "prompt", return_value="awesome_test") arg = ask("foobar", "a", None) @@ -179,17 +180,17 @@ def test_extra_argument_parser_add_argument(iface): assert extra_argument_parse._extra_params["Test"]["foo"]["ask"] == "lol" extra_argument_parse = ExtraArgumentParser(iface) - extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": "lol"}) - assert GLOBAL_SECTION in extra_argument_parse._extra_params - assert "foo" in extra_argument_parse._extra_params[GLOBAL_SECTION] - assert "ask" in extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"] - assert extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"]["ask"] == "lol" + extra_argument_parse.add_argument("_global", "foo", {"ask": "lol"}) + assert "_global" in extra_argument_parse._extra_params + assert "foo" in extra_argument_parse._extra_params["_global"] + assert "ask" in extra_argument_parse._extra_params["_global"]["foo"] + assert extra_argument_parse._extra_params["_global"]["foo"]["ask"] == "lol" def test_extra_argument_parser_add_argument_bad_arg(iface): extra_argument_parse = ExtraArgumentParser(iface) with pytest.raises(MoulinetteError) as exception: - extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": 1}) + extra_argument_parse.add_argument("_global", "foo", {"ask": 1}) expected_msg = "unable to validate extra parameter '%s' for argument '%s': %s" % ( "ask", @@ -199,23 +200,23 @@ def test_extra_argument_parser_add_argument_bad_arg(iface): assert expected_msg in str(exception) extra_argument_parse = ExtraArgumentParser(iface) - extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"error": 1}) + extra_argument_parse.add_argument("_global", "foo", {"error": 1}) - assert GLOBAL_SECTION in extra_argument_parse._extra_params - assert "foo" in extra_argument_parse._extra_params[GLOBAL_SECTION] - assert not len(extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"]) + assert "_global" in extra_argument_parse._extra_params + assert "foo" in extra_argument_parse._extra_params["_global"] + assert not len(extra_argument_parse._extra_params["_global"]["foo"]) def test_extra_argument_parser_parse_args(iface, mocker): extra_argument_parse = ExtraArgumentParser(iface) - extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": "lol"}) - extra_argument_parse.add_argument(GLOBAL_SECTION, "foo2", {"ask": "lol2"}) + extra_argument_parse.add_argument("_global", "foo", {"ask": "lol"}) + extra_argument_parse.add_argument("_global", "foo2", {"ask": "lol2"}) extra_argument_parse.add_argument( - GLOBAL_SECTION, "bar", {"password": "lul", "ask": "lul"} + "_global", "bar", {"password": "lul", "ask": "lul"} ) args = extra_argument_parse.parse_args( - GLOBAL_SECTION, {"foo": 1, "foo2": ["a", "b", {"foobar": True}], "bar": "rab"} + "_global", {"foo": 1, "foo2": ["a", "b", {"foobar": True}], "bar": "rab"} ) assert "foo" in args @@ -231,22 +232,32 @@ def test_extra_argument_parser_parse_args(iface, mocker): def test_actions_map_api(): from moulinette.interfaces.api import ActionsMapParser - amap = ActionsMap(ActionsMapParser()) + parser = ActionsMapParser() + amap = ActionsMap(parser) assert amap.main_namespace == "moulitest" assert amap.default_authentication == "dummy" assert ("GET", "/test-auth/default") in amap.parser.routes assert ("POST", "/test-auth/subcat/post") in amap.parser.routes + assert parser.auth_method(None, ("GET", "/test-auth/default")) == "dummy" + assert parser.auth_method(None, ("GET", "/test-auth/only-api")) == "dummy" + assert parser.auth_method(None, ("GET", "/test-auth/only-cli")) is None + amap.generate_cache("moulitest") - amap = ActionsMap(ActionsMapParser()) + parser = ActionsMapParser() + amap = ActionsMap(parser) assert amap.main_namespace == "moulitest" assert amap.default_authentication == "dummy" assert ("GET", "/test-auth/default") in amap.parser.routes assert ("POST", "/test-auth/subcat/post") in amap.parser.routes + assert parser.auth_method(None, ("GET", "/test-auth/default")) == "dummy" + assert parser.auth_method(None, ("GET", "/test-auth/only-api")) == "dummy" + assert parser.auth_method(None, ("GET", "/test-auth/only-cli")) is None + def test_actions_map_import_error(mocker): from moulinette.interfaces.api import ActionsMapParser @@ -281,14 +292,16 @@ def test_actions_map_cli(): from moulinette.interfaces.cli import ActionsMapParser import argparse - parser = argparse.ArgumentParser(add_help=False) - parser.add_argument( + top_parser = argparse.ArgumentParser(add_help=False) + top_parser.add_argument( "--debug", action="store_true", default=False, help="Log and print debug messages", ) - amap = ActionsMap(ActionsMapParser(top_parser=parser)) + + parser = ActionsMapParser(top_parser=top_parser) + amap = ActionsMap(parser) assert amap.main_namespace == "moulitest" assert amap.default_authentication == "dummy" @@ -304,9 +317,14 @@ def test_actions_map_cli(): .choices ) + assert parser.auth_method(["testauth", "default"]) == "dummy" + assert parser.auth_method(["testauth", "only-api"]) is None + assert parser.auth_method(["testauth", "only-cli"]) == "dummy" + amap.generate_cache("moulitest") - amap = ActionsMap(ActionsMapParser(top_parser=parser)) + parser = ActionsMapParser(top_parser=top_parser) + amap = ActionsMap(parser) assert amap.main_namespace == "moulitest" assert amap.default_authentication == "dummy" @@ -321,3 +339,7 @@ def test_actions_map_cli(): ._actions[1] .choices ) + + assert parser.auth_method(["testauth", "default"]) == "dummy" + assert parser.auth_method(["testauth", "only-api"]) is None + assert parser.auth_method(["testauth", "only-cli"]) == "dummy" diff --git a/test/test_auth.py b/test/test_auth.py index d4f74162..51ad2007 100644 --- a/test/test_auth.py +++ b/test/test_auth.py @@ -11,6 +11,7 @@ class TestAuthAPI: password = "dummy" data = {"credentials": password} + if profile: data["profile"] = profile @@ -67,13 +68,7 @@ class TestAuthAPI: def test_login(self, moulinette_webapi): assert self.login(moulinette_webapi).text == "Logged in" - assert "session.id" in moulinette_webapi.cookies - assert "session.tokens" in moulinette_webapi.cookies - - cache_session_default = os.environ["MOULINETTE_CACHE_DIR"] + "/session/dummy/" - assert moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir( - cache_session_default - ) + assert "session.moulitest" in moulinette_webapi.cookies def test_login_bad_password(self, moulinette_webapi): assert ( @@ -81,8 +76,7 @@ class TestAuthAPI: == "Invalid password" ) - assert "session.id" not in moulinette_webapi.cookies - assert "session.tokens" not in moulinette_webapi.cookies + assert "session.moulitest" not in moulinette_webapi.cookies def test_login_csrf_attempt(self, moulinette_webapi): # C.f. @@ -93,8 +87,7 @@ class TestAuthAPI: "CSRF protection" in self.login(moulinette_webapi, csrf=True, status=403).text ) - assert not any(c.name == "session.id" for c in moulinette_webapi.cookiejar) - assert not any(c.name == "session.tokens" for c in moulinette_webapi.cookiejar) + assert not any(c.name == "session.moulitest" for c in moulinette_webapi.cookiejar) def test_login_then_legit_request_without_cookies(self, moulinette_webapi): self.login(moulinette_webapi) @@ -106,6 +99,11 @@ class TestAuthAPI: def test_login_then_legit_request(self, moulinette_webapi): self.login(moulinette_webapi) + assert "session.moulitest" in moulinette_webapi.cookies + print("====================cookie") + print(moulinette_webapi.cookiejar) + print(moulinette_webapi.cookies) + assert ( moulinette_webapi.get("/test-auth/default", status=200).text == '"some_data_from_default"' @@ -121,11 +119,6 @@ class TestAuthAPI: moulinette_webapi.get("/logout", status=200) - cache_session_default = os.environ["MOULINETTE_CACHE_DIR"] + "/session/dummy/" - assert not moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir( - cache_session_default - ) - assert ( moulinette_webapi.get("/test-auth/default", status=401).text == "Authentication required" @@ -134,15 +127,7 @@ class TestAuthAPI: def test_login_other_profile(self, moulinette_webapi): self.login(moulinette_webapi, profile="yoloswag", password="yoloswag") - assert "session.id" in moulinette_webapi.cookies - assert "session.tokens" in moulinette_webapi.cookies - - cache_session_default = ( - os.environ["MOULINETTE_CACHE_DIR"] + "/session/yoloswag/" - ) - assert moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir( - cache_session_default - ) + assert "session.moulitest" in moulinette_webapi.cookies def test_login_wrong_profile(self, moulinette_webapi): self.login(moulinette_webapi)