Further attempt to simplify authentication management code + get rid of msignals madness

This commit is contained in:
Alexandre Aubin 2021-07-09 20:42:11 +02:00
parent 6310ef5b6e
commit ebdb1e22ee
10 changed files with 144 additions and 287 deletions

View file

@ -1,7 +1,6 @@
{ {
"argument_required": "Argument '{argument}' is required", "argument_required": "Argument '{argument}' is required",
"authentication_required": "Authentication required", "authentication_required": "Authentication required",
"authentication_required_long": "Authentication is required to perform this action",
"colon": "{}: ", "colon": "{}: ",
"confirm": "Confirm {prompt}", "confirm": "Confirm {prompt}",
"deprecated_command": "'{prog} {command}' is deprecated and will be removed in the future", "deprecated_command": "'{prog} {command}' is deprecated and will be removed in the future",

View file

@ -40,6 +40,13 @@ msettings = dict()
m18n = Moulinette18n() m18n = Moulinette18n()
def prompt(**kwargs):
return msettings["interface"].prompt(**kwargs)
def display(**kwargs):
return msettings["interface"].display(**kwargs)
# Package functions # Package functions

View file

@ -20,7 +20,7 @@ from moulinette.core import (
MoulinetteAuthenticationError, MoulinetteAuthenticationError,
MoulinetteValidationError, MoulinetteValidationError,
) )
from moulinette.interfaces import BaseActionsMapParser, GLOBAL_SECTION, TO_RETURN_PROP from moulinette.interfaces import BaseActionsMapParser, TO_RETURN_PROP
from moulinette.utils.log import start_action_logging from moulinette.utils.log import start_action_logging
logger = logging.getLogger("moulinette.actionsmap") logger = logging.getLogger("moulinette.actionsmap")
@ -42,7 +42,6 @@ class _ExtraParameter(object):
""" """
def __init__(self, iface): def __init__(self, iface):
# TODO: Add conn argument which contains authentification object
self.iface = iface self.iface = iface
# Required variables # Required variables
@ -98,7 +97,7 @@ class CommentParameter(_ExtraParameter):
def __call__(self, message, arg_name, arg_value): def __call__(self, message, arg_name, arg_value):
if arg_value is None: if arg_value is None:
return return
return self.iface.display(m18n.n(message)) return msettings['interface'].display(m18n.n(message))
@classmethod @classmethod
def validate(klass, value, arg_name): def validate(klass, value, arg_name):
@ -135,7 +134,7 @@ class AskParameter(_ExtraParameter):
try: try:
# Ask for the argument value # Ask for the argument value
return self.iface.prompt(m18n.n(message)) return msettings['interface'].prompt(m18n.n(message))
except NotImplementedError: except NotImplementedError:
return arg_value return arg_value
@ -173,7 +172,7 @@ class PasswordParameter(AskParameter):
try: try:
# Ask for the password # Ask for the password
return self.iface.prompt(m18n.n(message), True, True) return msettings['interface'].prompt(m18n.n(message), True, True)
except NotImplementedError: except NotImplementedError:
return arg_value return arg_value
@ -284,7 +283,7 @@ class ExtraArgumentParser(object):
def __init__(self, iface): def __init__(self, iface):
self.iface = iface self.iface = iface
self.extra = OrderedDict() self.extra = OrderedDict()
self._extra_params = {GLOBAL_SECTION: {}} self._extra_params = {"_global": {}}
# Append available extra parameters for the current interface # Append available extra parameters for the current interface
for klass in extraparameters_list: for klass in extraparameters_list:
@ -326,7 +325,7 @@ class ExtraArgumentParser(object):
Add extra parameters to apply on an action argument Add extra parameters to apply on an action argument
Keyword arguments: Keyword arguments:
- tid -- The tuple identifier of the action or GLOBAL_SECTION - tid -- The tuple identifier of the action or _global
for global extra parameters for global extra parameters
- arg_name -- The argument name - arg_name -- The argument name
- parameters -- A dict of extra parameters with their values - parameters -- A dict of extra parameters with their values
@ -349,7 +348,7 @@ class ExtraArgumentParser(object):
- args -- A dict of argument name associated to their value - args -- A dict of argument name associated to their value
""" """
extra_args = OrderedDict(self._extra_params.get(GLOBAL_SECTION, {})) extra_args = OrderedDict(self._extra_params.get("_global", {}))
extra_args.update(self._extra_params.get(tid, {})) extra_args.update(self._extra_params.get(tid, {}))
# Iterate over action arguments with extra parameters # Iterate over action arguments with extra parameters
@ -492,16 +491,15 @@ class ActionsMap(object):
else: else:
return mod.Authenticator() return mod.Authenticator()
def check_authentication_if_required(self, args, **kwargs): def check_authentication_if_required(self, *args, **kwargs):
auth_method = self.parser.auth_method(args, **kwargs) auth_method = self.parser.auth_method(*args, **kwargs)
if auth_method is None: if auth_method is None:
return return
authenticator = self.get_authenticator(auth_method) authenticator = self.get_authenticator(auth_method)
if not msettings['interface'].authenticate(authenticator): msettings['interface'].authenticate(authenticator)
raise MoulinetteAuthenticationError("authentication_required_long")
def process(self, args, timeout=None, **kwargs): def process(self, args, timeout=None, **kwargs):
""" """
@ -707,6 +705,7 @@ class ActionsMap(object):
) )
else: else:
self.main_namespace = namespace self.main_namespace = namespace
self.name = _global["name"]
self.default_authentication = _global["authentication"][ self.default_authentication = _global["authentication"][
interface_type interface_type
] ]

View file

@ -37,119 +37,11 @@ class BaseAuthenticator(object):
try: try:
# Attempt to authenticate # Attempt to authenticate
self._authenticate_credentials(credentials) auth_info = self._authenticate_credentials(credentials) or {}
except MoulinetteError: except MoulinetteError:
raise raise
except Exception as e: except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'") logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate") raise MoulinetteAuthenticationError("unable_authenticate")
# Store session for later using the provided (new) token if any return auth_info
if store_session:
try:
s_id = random_ascii()
s_token = random_ascii()
self._store_session(s_id, s_token)
except Exception as e:
import traceback
traceback.print_exc()
logger.exception(f"unable to store session because {e}")
else:
logger.debug("session has been stored")
def _authenticate_credentials(self, credentials=None):
"""Attempt to authenticate
Attempt to authenticate with given credentials. It should raise an
AuthenticationError exception if authentication fails.
Keyword arguments:
- credentials -- A string containing the credentials to be used by the authenticator
"""
raise NotImplementedError(
"derived class '%s' must override this method" % self.__class__.__name__
)
def _open_sessionfile(self, session_id, mode="r"):
"""Open a session file for this instance in given mode"""
return open_cachefile(
"%s.asc" % session_id, mode, subdir="session/%s" % self.name
)
def _session_exists(self, session_id):
"""Check a session exists"""
return cachefile_exists("%s.asc" % session_id, subdir="session/%s" % self.name)
def _store_session(self, session_id, session_token):
"""Store a session to be able to use it later to reauthenticate"""
# We store a hash of the session_id and the session_token (the token is assumed to be secret)
to_hash = "{id}:{token}".format(id=session_id, token=session_token).encode()
hash_ = hashlib.sha256(to_hash).hexdigest()
with self._open_sessionfile(session_id, "w") as f:
f.write(hash_)
def authenticate_session(self, s_id, s_token):
try:
# Attempt to authenticate
self._authenticate_session(s_id, s_token)
except MoulinetteError:
raise
except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate")
def _authenticate_session(self, session_id, session_token):
"""Checks session and token against the stored session token"""
if not self._session_exists(session_id):
raise MoulinetteAuthenticationError("session_expired")
try:
# FIXME : shouldn't we also add a check that this session file
# is not too old ? e.g. not older than 24 hours ? idk...
with self._open_sessionfile(session_id, "r") as f:
stored_hash = f.read()
except IOError as e:
logger.debug("unable to retrieve session", exc_info=1)
raise MoulinetteAuthenticationError("unable_retrieve_session", exception=e)
else:
#
# session_id (or just id) : This is unique id for the current session from the user. Not too important
# if this info gets stolen somehow. It is stored in the client's side (browser) using regular cookies.
#
# session_token (or just token) : This is a secret info, like some sort of ephemeral password,
# used to authenticate the session without the user having to retype the password all the time...
# - It is generated on our side during the initial auth of the user (which happens with the actual admin password)
# - It is stored on the client's side (browser) using (signed) cookies.
# - We also store it on our side in the form of a hash of {id}:{token} (c.f. _store_session).
# We could simply store the raw token, but hashing it is an additonal low-cost security layer
# in case this info gets exposed for some reason (e.g. bad file perms for reasons...)
#
# When the user comes back, we fetch the session_id and session_token from its cookies. Then we
# re-hash the {id}:{token} and compare it to the previously stored hash for this session_id ...
# It it matches, then the user is authenticated. Otherwise, the token is invalid.
#
to_hash = "{id}:{token}".format(id=session_id, token=session_token).encode()
hash_ = hashlib.sha256(to_hash).hexdigest()
if not hmac.compare_digest(hash_, stored_hash):
raise MoulinetteAuthenticationError("invalid_token")
else:
return
def _clean_session(self, session_id):
"""Clean a session cache
Remove cache for the session 'session_id' and for this authenticator profile
Keyword arguments:
- session_id -- The session id to clean
"""
sessiondir = get_cachedir("session")
try:
os.remove(os.path.join(sessiondir, self.name, "%s.asc" % session_id))
except OSError:
pass

View file

@ -11,7 +11,7 @@ from moulinette.core import MoulinetteError
logger = logging.getLogger("moulinette.interface") logger = logging.getLogger("moulinette.interface")
GLOBAL_SECTION = "_global" # FIXME : are these even used for anything useful ...
TO_RETURN_PROP = "_to_return" TO_RETURN_PROP = "_to_return"
CALLBACKS_PROP = "_callbacks" CALLBACKS_PROP = "_callbacks"
@ -114,7 +114,7 @@ class BaseActionsMapParser(object):
"derived class '%s' must override this method" % self.__class__.__name__ "derived class '%s' must override this method" % self.__class__.__name__
) )
def auth_method(self, args, **kwargs): def auth_method(self, *args, **kwargs):
"""Check if authentication is required to run the requested action """Check if authentication is required to run the requested action
Keyword arguments: Keyword arguments:
@ -156,7 +156,7 @@ class BaseActionsMapParser(object):
): ):
raise MoulinetteError("invalid_usage") raise MoulinetteError("invalid_usage")
elif not tid: elif not tid:
tid = GLOBAL_SECTION tid = "_global"
# Prepare namespace # Prepare namespace
if namespace is None: if namespace is None:

View file

@ -61,7 +61,7 @@ def filter_csrf(callback):
class LogQueues(dict): class LogQueues(dict):
"""Map of session id to queue.""" """Map of session ids to queue."""
pass pass
@ -78,9 +78,10 @@ class APIQueueHandler(logging.Handler):
self.queues = LogQueues() self.queues = LogQueues()
def emit(self, record): def emit(self, record):
s_id = Session.get_infos()["id"]
sid = request.get_cookie("session.id") sid = request.get_cookie("session.id")
try: try:
queue = self.queues[sid] queue = self.queues[s_id]
except KeyError: except KeyError:
# Session is not initialized, abandon. # Session is not initialized, abandon.
return return
@ -209,6 +210,34 @@ class _HTTPArgumentParser(object):
raise MoulinetteValidationError(message, raw_msg=True) raise MoulinetteValidationError(message, raw_msg=True)
class Session():
secret = random_ascii()
actionsmap_name = None # This is later set to the actionsmap name
def set_infos(infos):
assert isinstance(infos, dict)
response.set_cookie(f"session.{Session.actionsmap_name}", infos, secure=True, secret=Session.secret)
def get_infos():
try:
infos = request.get_cookie(f"session.{Session.actionsmap_name}", secret=Session.secret, default={})
except Exception:
infos = {}
if "id" not in infos:
infos["id"] = random_ascii()
return infos
def delete_infos():
response.set_cookie(f"session.{Session.actionsmap_name}", "", max_age=-1)
class _ActionsMapPlugin(object): class _ActionsMapPlugin(object):
"""Actions map Bottle Plugin """Actions map Bottle Plugin
@ -228,8 +257,7 @@ class _ActionsMapPlugin(object):
self.actionsmap = actionsmap self.actionsmap = actionsmap
self.log_queues = log_queues self.log_queues = log_queues
# TODO: Save and load secrets? Session.actionsmap_name = actionsmap.name
self.secrets = {}
def setup(self, app): def setup(self, app):
"""Setup plugin on the application """Setup plugin on the application
@ -240,36 +268,6 @@ class _ActionsMapPlugin(object):
- app -- The application instance - app -- The application instance
""" """
# Login wrapper
def _login(callback):
def wrapper():
kwargs = {}
try:
kwargs["credentials"] = request.POST.credentials
except KeyError:
raise HTTPResponse("Missing credentials parameter", 400)
# Apparently even if the key doesn't exists, request.POST.foobar just returns empty string...
if not kwargs["credentials"]:
raise HTTPResponse("Missing credentials parameter", 400)
kwargs["profile"] = request.POST.get(
"profile", self.actionsmap.default_authentication
)
return callback(**kwargs)
return wrapper
# Logout wrapper
def _logout(callback):
def wrapper():
kwargs = {}
kwargs["profile"] = request.POST.get(
"profile", self.actionsmap.default_authentication
)
return callback(**kwargs)
return wrapper
# Append authentication routes # Append authentication routes
app.route( app.route(
@ -278,7 +276,6 @@ class _ActionsMapPlugin(object):
method="POST", method="POST",
callback=self.login, callback=self.login,
skip=["actionsmap"], skip=["actionsmap"],
apply=_login,
) )
app.route( app.route(
"/logout", "/logout",
@ -286,7 +283,6 @@ class _ActionsMapPlugin(object):
method="GET", method="GET",
callback=self.logout, callback=self.logout,
skip=["actionsmap"], skip=["actionsmap"],
apply=_logout,
) )
# Append messages route # Append messages route
@ -347,106 +343,61 @@ class _ActionsMapPlugin(object):
# Routes callbacks # Routes callbacks
def login(self, credentials, profile): def login(self):
"""Log in to an authenticator profile """Log in to an authenticator
Attempt to authenticate to a given authenticator profile and Attempt to authenticate to the default authenticator and
register it with the current session - a new one will be created register it with the current session - a new one will be created
if needed. if needed.
Keyword arguments:
- credentials -- Some credentials to use for login
- profile -- The authenticator profile name to log in
""" """
credentials = request.POST.credentials
# Apparently even if the key doesn't exists, request.POST.foobar just returns empty string...
if not credentials:
raise HTTPResponse("Missing credentials parameter", 400)
profile = request.POST.profile
if not profile:
profile = self.actionsmap.default_authentication
authenticator = self.actionsmap.get_authenticator(profile) authenticator = self.actionsmap.get_authenticator(profile)
try: try:
s_id, s_token = authenticator.authenticate_credentials(credentials, store_session=True) auth_info = authenticator.authenticate_credentials(credentials, store_session=True)
session_infos = Session.get_infos()
session_infos[profile] = auth_info
except MoulinetteError as e: except MoulinetteError as e:
try: try:
self.logout(profile) self.logout()
except Exception: except Exception:
pass pass
# FIXME : replace with MoulinetteAuthenticationError !? # FIXME : replace with MoulinetteAuthenticationError !?
raise HTTPResponse(e.strerror, 401) raise HTTPResponse(e.strerror, 401)
else: else:
# Save session id and token Session.set_infos(session_infos)
# Create and save (in RAM) new cookie secret used to secure(=sign?) the cookie
self.secrets[s_id] = s_secret = random_ascii()
# Fetch current token per profile
try:
s_tokens = request.get_cookie("session.tokens", secret=s_secret) or {}
except Exception:
# Same as for session.id a few lines before
s_tokens = {}
# Update dicts with new values
s_tokens[profile] = s_token
response.set_cookie("session.id", s_id, secure=True)
response.set_cookie(
"session.tokens", {""}, secure=True, secret=s_secret
)
return m18n.g("logged_in") return m18n.g("logged_in")
# This is called before each time a route is going to be processed # This is called before each time a route is going to be processed
def authenticate(self, authenticator): def authenticate(self, authenticator):
s_id = request.get_cookie("session.id")
try: try:
s_secret = self.secrets[s_id] session_infos = Session.get_infos()[authenticator.name]
s_token = request.get_cookie("session.tokens", secret=s_secret, default={})[
authenticator.name
]
except KeyError: except KeyError:
msg = m18n.g("authentication_required") msg = m18n.g("authentication_required")
raise HTTPResponse(msg, 401) raise HTTPResponse(msg, 401)
else:
authenticator.authenticate_session(s_id, s_token)
def logout(self, profile): return session_infos
"""Log out from an authenticator profile
Attempt to unregister a given profile - or all by default - from def logout(self):
the current session.
Keyword arguments:
- profile -- The authenticator profile name to log out
"""
# Retrieve session values
try: try:
s_id = request.get_cookie("session.id") or None Session.get_infos()
except: except KeyError:
# Super rare case where there are super weird cookie / cache issue raise HTTPResponse(m18n.g("not_logged_in"), 401)
# Previous line throws a CookieError that creates a 500 error ... else:
# So let's catch it and just use None... # Delete cookie and clean the session
s_id = None Session.delete_infos()
return m18n.g("logged_out")
if s_id is not None:
# We check that there's a (signed) session.hash available
# for additional security ?
# (An attacker could not craft such signed hashed ? (FIXME : need to make sure of this))
try:
s_secret = self.secrets[s_id]
except KeyError:
s_secret = {}
if profile not in request.get_cookie(
"session.tokens", secret=s_secret, default={}
):
raise HTTPResponse(m18n.g("not_logged_in"), 401)
else:
del self.secrets[s_id]
authenticator = self.actionsmap.get_authenticator(profile)
authenticator._clean_session(s_id)
# TODO: Clean the session for profile only
# Delete cookie and clean the session
response.set_cookie("session.tokens", "", max_age=-1)
return m18n.g("logged_out")
def messages(self): def messages(self):
"""Listen to the messages WebSocket stream """Listen to the messages WebSocket stream
@ -454,7 +405,7 @@ class _ActionsMapPlugin(object):
Retrieve the WebSocket stream and send to it each messages displayed by Retrieve the WebSocket stream and send to it each messages displayed by
the display method. They are JSON encoded as a dict { style: message }. the display method. They are JSON encoded as a dict { style: message }.
""" """
s_id = request.get_cookie("session.id") s_id = Session.get_infos()["id"]
try: try:
queue = self.log_queues[s_id] queue = self.log_queues[s_id]
except KeyError: except KeyError:
@ -515,7 +466,8 @@ class _ActionsMapPlugin(object):
finally: finally:
# Close opened WebSocket by putting StopIteration in the queue # Close opened WebSocket by putting StopIteration in the queue
try: try:
queue = self.log_queues[request.get_cookie("session.id")] s_id = Session.get_infos()["id"]
queue = self.log_queues[s_id]
except KeyError: except KeyError:
pass pass
else: else:
@ -523,7 +475,7 @@ class _ActionsMapPlugin(object):
def display(self, message, style): def display(self, message, style):
s_id = request.get_cookie("session.id") s_id = Sesson.get_infos()["id"]
try: try:
queue = self.log_queues[s_id] queue = self.log_queues[s_id]
except KeyError: except KeyError:
@ -659,7 +611,7 @@ class ActionsMapParser(BaseActionsMapParser):
# Return the created parser # Return the created parser
return parser return parser
def auth_method(self, args, route, **kwargs): def auth_method(self, _, route):
try: try:
# Retrieve the tid for the route # Retrieve the tid for the route

View file

@ -397,7 +397,7 @@ class ActionsMapParser(BaseActionsMapParser):
self.global_parser.add_argument(*names, **argument_options) self.global_parser.add_argument(*names, **argument_options)
def auth_method(self, args, **kwargs): def auth_method(self, args):
# FIXME? idk .. this try/except is duplicated from parse_args below # FIXME? idk .. this try/except is duplicated from parse_args below
# Just to be able to obtain the tid # Just to be able to obtain the tid
try: try:

View file

@ -3,6 +3,7 @@
# Global parameters # # Global parameters #
############################# #############################
_global: _global:
name: moulitest
authentication: authentication:
api: dummy api: dummy
cli: dummy cli: dummy

View file

@ -10,9 +10,8 @@ from moulinette.actionsmap import (
ActionsMap, ActionsMap,
) )
from moulinette.interfaces import GLOBAL_SECTION
from moulinette.core import MoulinetteError from moulinette.core import MoulinetteError
from moulinette import m18n from moulinette import m18n, msettings
@pytest.fixture @pytest.fixture
@ -74,6 +73,7 @@ def test_ask_parameter(iface, mocker):
from moulinette.core import Moulinette18n from moulinette.core import Moulinette18n
msettings["interface"] = iface
mocker.patch.object(Moulinette18n, "n", return_value="awesome_test") mocker.patch.object(Moulinette18n, "n", return_value="awesome_test")
mocker.patch.object(iface, "prompt", return_value="awesome_test") mocker.patch.object(iface, "prompt", return_value="awesome_test")
arg = ask("foobar", "a", None) arg = ask("foobar", "a", None)
@ -87,6 +87,7 @@ def test_password_parameter(iface, mocker):
from moulinette.core import Moulinette18n from moulinette.core import Moulinette18n
msettings["interface"] = iface
mocker.patch.object(Moulinette18n, "n", return_value="awesome_test") mocker.patch.object(Moulinette18n, "n", return_value="awesome_test")
mocker.patch.object(iface, "prompt", return_value="awesome_test") mocker.patch.object(iface, "prompt", return_value="awesome_test")
arg = ask("foobar", "a", None) arg = ask("foobar", "a", None)
@ -179,17 +180,17 @@ def test_extra_argument_parser_add_argument(iface):
assert extra_argument_parse._extra_params["Test"]["foo"]["ask"] == "lol" assert extra_argument_parse._extra_params["Test"]["foo"]["ask"] == "lol"
extra_argument_parse = ExtraArgumentParser(iface) extra_argument_parse = ExtraArgumentParser(iface)
extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": "lol"}) extra_argument_parse.add_argument("_global", "foo", {"ask": "lol"})
assert GLOBAL_SECTION in extra_argument_parse._extra_params assert "_global" in extra_argument_parse._extra_params
assert "foo" in extra_argument_parse._extra_params[GLOBAL_SECTION] assert "foo" in extra_argument_parse._extra_params["_global"]
assert "ask" in extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"] assert "ask" in extra_argument_parse._extra_params["_global"]["foo"]
assert extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"]["ask"] == "lol" assert extra_argument_parse._extra_params["_global"]["foo"]["ask"] == "lol"
def test_extra_argument_parser_add_argument_bad_arg(iface): def test_extra_argument_parser_add_argument_bad_arg(iface):
extra_argument_parse = ExtraArgumentParser(iface) extra_argument_parse = ExtraArgumentParser(iface)
with pytest.raises(MoulinetteError) as exception: with pytest.raises(MoulinetteError) as exception:
extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": 1}) extra_argument_parse.add_argument("_global", "foo", {"ask": 1})
expected_msg = "unable to validate extra parameter '%s' for argument '%s': %s" % ( expected_msg = "unable to validate extra parameter '%s' for argument '%s': %s" % (
"ask", "ask",
@ -199,23 +200,23 @@ def test_extra_argument_parser_add_argument_bad_arg(iface):
assert expected_msg in str(exception) assert expected_msg in str(exception)
extra_argument_parse = ExtraArgumentParser(iface) extra_argument_parse = ExtraArgumentParser(iface)
extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"error": 1}) extra_argument_parse.add_argument("_global", "foo", {"error": 1})
assert GLOBAL_SECTION in extra_argument_parse._extra_params assert "_global" in extra_argument_parse._extra_params
assert "foo" in extra_argument_parse._extra_params[GLOBAL_SECTION] assert "foo" in extra_argument_parse._extra_params["_global"]
assert not len(extra_argument_parse._extra_params[GLOBAL_SECTION]["foo"]) assert not len(extra_argument_parse._extra_params["_global"]["foo"])
def test_extra_argument_parser_parse_args(iface, mocker): def test_extra_argument_parser_parse_args(iface, mocker):
extra_argument_parse = ExtraArgumentParser(iface) extra_argument_parse = ExtraArgumentParser(iface)
extra_argument_parse.add_argument(GLOBAL_SECTION, "foo", {"ask": "lol"}) extra_argument_parse.add_argument("_global", "foo", {"ask": "lol"})
extra_argument_parse.add_argument(GLOBAL_SECTION, "foo2", {"ask": "lol2"}) extra_argument_parse.add_argument("_global", "foo2", {"ask": "lol2"})
extra_argument_parse.add_argument( extra_argument_parse.add_argument(
GLOBAL_SECTION, "bar", {"password": "lul", "ask": "lul"} "_global", "bar", {"password": "lul", "ask": "lul"}
) )
args = extra_argument_parse.parse_args( args = extra_argument_parse.parse_args(
GLOBAL_SECTION, {"foo": 1, "foo2": ["a", "b", {"foobar": True}], "bar": "rab"} "_global", {"foo": 1, "foo2": ["a", "b", {"foobar": True}], "bar": "rab"}
) )
assert "foo" in args assert "foo" in args
@ -231,22 +232,32 @@ def test_extra_argument_parser_parse_args(iface, mocker):
def test_actions_map_api(): def test_actions_map_api():
from moulinette.interfaces.api import ActionsMapParser from moulinette.interfaces.api import ActionsMapParser
amap = ActionsMap(ActionsMapParser()) parser = ActionsMapParser()
amap = ActionsMap(parser)
assert amap.main_namespace == "moulitest" assert amap.main_namespace == "moulitest"
assert amap.default_authentication == "dummy" assert amap.default_authentication == "dummy"
assert ("GET", "/test-auth/default") in amap.parser.routes assert ("GET", "/test-auth/default") in amap.parser.routes
assert ("POST", "/test-auth/subcat/post") in amap.parser.routes assert ("POST", "/test-auth/subcat/post") in amap.parser.routes
assert parser.auth_method(None, ("GET", "/test-auth/default")) == "dummy"
assert parser.auth_method(None, ("GET", "/test-auth/only-api")) == "dummy"
assert parser.auth_method(None, ("GET", "/test-auth/only-cli")) is None
amap.generate_cache("moulitest") amap.generate_cache("moulitest")
amap = ActionsMap(ActionsMapParser()) parser = ActionsMapParser()
amap = ActionsMap(parser)
assert amap.main_namespace == "moulitest" assert amap.main_namespace == "moulitest"
assert amap.default_authentication == "dummy" assert amap.default_authentication == "dummy"
assert ("GET", "/test-auth/default") in amap.parser.routes assert ("GET", "/test-auth/default") in amap.parser.routes
assert ("POST", "/test-auth/subcat/post") in amap.parser.routes assert ("POST", "/test-auth/subcat/post") in amap.parser.routes
assert parser.auth_method(None, ("GET", "/test-auth/default")) == "dummy"
assert parser.auth_method(None, ("GET", "/test-auth/only-api")) == "dummy"
assert parser.auth_method(None, ("GET", "/test-auth/only-cli")) is None
def test_actions_map_import_error(mocker): def test_actions_map_import_error(mocker):
from moulinette.interfaces.api import ActionsMapParser from moulinette.interfaces.api import ActionsMapParser
@ -281,14 +292,16 @@ def test_actions_map_cli():
from moulinette.interfaces.cli import ActionsMapParser from moulinette.interfaces.cli import ActionsMapParser
import argparse import argparse
parser = argparse.ArgumentParser(add_help=False) top_parser = argparse.ArgumentParser(add_help=False)
parser.add_argument( top_parser.add_argument(
"--debug", "--debug",
action="store_true", action="store_true",
default=False, default=False,
help="Log and print debug messages", help="Log and print debug messages",
) )
amap = ActionsMap(ActionsMapParser(top_parser=parser))
parser = ActionsMapParser(top_parser=top_parser)
amap = ActionsMap(parser)
assert amap.main_namespace == "moulitest" assert amap.main_namespace == "moulitest"
assert amap.default_authentication == "dummy" assert amap.default_authentication == "dummy"
@ -304,9 +317,14 @@ def test_actions_map_cli():
.choices .choices
) )
assert parser.auth_method(["testauth", "default"]) == "dummy"
assert parser.auth_method(["testauth", "only-api"]) is None
assert parser.auth_method(["testauth", "only-cli"]) == "dummy"
amap.generate_cache("moulitest") amap.generate_cache("moulitest")
amap = ActionsMap(ActionsMapParser(top_parser=parser)) parser = ActionsMapParser(top_parser=top_parser)
amap = ActionsMap(parser)
assert amap.main_namespace == "moulitest" assert amap.main_namespace == "moulitest"
assert amap.default_authentication == "dummy" assert amap.default_authentication == "dummy"
@ -321,3 +339,7 @@ def test_actions_map_cli():
._actions[1] ._actions[1]
.choices .choices
) )
assert parser.auth_method(["testauth", "default"]) == "dummy"
assert parser.auth_method(["testauth", "only-api"]) is None
assert parser.auth_method(["testauth", "only-cli"]) == "dummy"

View file

@ -11,6 +11,7 @@ class TestAuthAPI:
password = "dummy" password = "dummy"
data = {"credentials": password} data = {"credentials": password}
if profile: if profile:
data["profile"] = profile data["profile"] = profile
@ -67,13 +68,7 @@ class TestAuthAPI:
def test_login(self, moulinette_webapi): def test_login(self, moulinette_webapi):
assert self.login(moulinette_webapi).text == "Logged in" assert self.login(moulinette_webapi).text == "Logged in"
assert "session.id" in moulinette_webapi.cookies assert "session.moulitest" in moulinette_webapi.cookies
assert "session.tokens" in moulinette_webapi.cookies
cache_session_default = os.environ["MOULINETTE_CACHE_DIR"] + "/session/dummy/"
assert moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir(
cache_session_default
)
def test_login_bad_password(self, moulinette_webapi): def test_login_bad_password(self, moulinette_webapi):
assert ( assert (
@ -81,8 +76,7 @@ class TestAuthAPI:
== "Invalid password" == "Invalid password"
) )
assert "session.id" not in moulinette_webapi.cookies assert "session.moulitest" not in moulinette_webapi.cookies
assert "session.tokens" not in moulinette_webapi.cookies
def test_login_csrf_attempt(self, moulinette_webapi): def test_login_csrf_attempt(self, moulinette_webapi):
# C.f. # C.f.
@ -93,8 +87,7 @@ class TestAuthAPI:
"CSRF protection" "CSRF protection"
in self.login(moulinette_webapi, csrf=True, status=403).text in self.login(moulinette_webapi, csrf=True, status=403).text
) )
assert not any(c.name == "session.id" for c in moulinette_webapi.cookiejar) assert not any(c.name == "session.moulitest" for c in moulinette_webapi.cookiejar)
assert not any(c.name == "session.tokens" for c in moulinette_webapi.cookiejar)
def test_login_then_legit_request_without_cookies(self, moulinette_webapi): def test_login_then_legit_request_without_cookies(self, moulinette_webapi):
self.login(moulinette_webapi) self.login(moulinette_webapi)
@ -106,6 +99,11 @@ class TestAuthAPI:
def test_login_then_legit_request(self, moulinette_webapi): def test_login_then_legit_request(self, moulinette_webapi):
self.login(moulinette_webapi) self.login(moulinette_webapi)
assert "session.moulitest" in moulinette_webapi.cookies
print("====================cookie")
print(moulinette_webapi.cookiejar)
print(moulinette_webapi.cookies)
assert ( assert (
moulinette_webapi.get("/test-auth/default", status=200).text moulinette_webapi.get("/test-auth/default", status=200).text
== '"some_data_from_default"' == '"some_data_from_default"'
@ -121,11 +119,6 @@ class TestAuthAPI:
moulinette_webapi.get("/logout", status=200) moulinette_webapi.get("/logout", status=200)
cache_session_default = os.environ["MOULINETTE_CACHE_DIR"] + "/session/dummy/"
assert not moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir(
cache_session_default
)
assert ( assert (
moulinette_webapi.get("/test-auth/default", status=401).text moulinette_webapi.get("/test-auth/default", status=401).text
== "Authentication required" == "Authentication required"
@ -134,15 +127,7 @@ class TestAuthAPI:
def test_login_other_profile(self, moulinette_webapi): def test_login_other_profile(self, moulinette_webapi):
self.login(moulinette_webapi, profile="yoloswag", password="yoloswag") self.login(moulinette_webapi, profile="yoloswag", password="yoloswag")
assert "session.id" in moulinette_webapi.cookies assert "session.moulitest" in moulinette_webapi.cookies
assert "session.tokens" in moulinette_webapi.cookies
cache_session_default = (
os.environ["MOULINETTE_CACHE_DIR"] + "/session/yoloswag/"
)
assert moulinette_webapi.cookies["session.id"] + ".asc" in os.listdir(
cache_session_default
)
def test_login_wrong_profile(self, moulinette_webapi): def test_login_wrong_profile(self, moulinette_webapi):
self.login(moulinette_webapi) self.login(moulinette_webapi)