Naive rewrite of the Authenticator methods to avoid the mystic __call__ stuff, improve semantic

This commit is contained in:
Alexandre Aubin 2021-06-15 17:57:41 +02:00
parent 4345081e6f
commit f792166581
3 changed files with 116 additions and 147 deletions

View file

@ -32,7 +32,33 @@ class BaseAuthenticator(object):
# Virtual methods
# Each authenticator classes must implement these methods.
def authenticate(self, credentials=None):
def authenticate_credentials(self, credentials=None, store_session=False):
try:
# Attempt to authenticate
self.authenticate(credentials)
except MoulinetteError:
raise
except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate")
# Store session for later using the provided (new) token if any
if store_session:
try:
s_id = random_ascii()
s_token = random_ascii()
self._store_session(s_id, s_token)
except Exception as e:
import traceback
traceback.print_exc()
logger.exception(f"unable to store session because {e}")
else:
logger.debug("session has been stored")
def _authenticate_credentials(self, credentials=None):
"""Attempt to authenticate
Attempt to authenticate with given credentials. It should raise an
@ -46,84 +72,6 @@ class BaseAuthenticator(object):
"derived class '%s' must override this method" % self.__class__.__name__
)
# Authentication methods
def __call__(self, credentials=None, token=None):
"""Attempt to authenticate
Attempt to authenticate either with credentials or with session
token if 'credentials' is None. If the authentication succeed, the
instance is returned and the session is registered for the token
if 'token' and 'credentials' are given.
The token is composed by the session identifier and a session
hash (the "true token") - to use for encryption - as a 2-tuple.
Keyword arguments:
- credentials -- A string containing the credentials to be used by the authenticator
- token -- The session token in the form of (id, hash)
"""
if hasattr(self, "is_authenticated"):
return self.is_authenticated
is_authenticated = False
#
# Authenticate using the credentials
#
if credentials:
try:
# Attempt to authenticate
self.authenticate(credentials)
except MoulinetteError:
raise
except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate")
else:
is_authenticated = True
# Store session for later using the provided (new) token if any
if token:
try:
s_id, s_token = token
self._store_session(s_id, s_token)
except Exception as e:
import traceback
traceback.print_exc()
logger.exception(f"unable to store session because {e}")
else:
logger.debug("session has been stored")
#
# Authenticate using the token provided
#
elif token:
try:
s_id, s_token = token
# Attempt to authenticate
self._authenticate_session(s_id, s_token)
except MoulinetteError:
raise
except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate")
else:
is_authenticated = True
#
# No credentials given, can't authenticate
#
else:
raise MoulinetteAuthenticationError("unable_authenticate")
self.is_authenticated = is_authenticated
return is_authenticated
# Private methods
def _open_sessionfile(self, session_id, mode="r"):
"""Open a session file for this instance in given mode"""
return open_cachefile(
@ -143,6 +91,16 @@ class BaseAuthenticator(object):
with self._open_sessionfile(session_id, "w") as f:
f.write(hash_)
def authenticate_session(s_id, s_token):
try:
# Attempt to authenticate
self._authenticate_session(s_id, s_token)
except MoulinetteError:
raise
except Exception as e:
logger.exception(f"authentication {self.name} failed because '{e}'")
raise MoulinetteAuthenticationError("unable_authenticate")
def _authenticate_session(self, session_id, session_token):
"""Checks session and token against the stored session token"""
if not self._session_exists(session_id):

View file

@ -359,49 +359,67 @@ class _ActionsMapPlugin(object):
- profile -- The authenticator profile name to log in
"""
# Retrieve session values
try:
s_id = request.get_cookie("session.id") or random_ascii()
except:
# Super rare case where there are super weird cookie / cache issue
# Previous line throws a CookieError that creates a 500 error ...
# So let's catch it and just use a fresh ID then...
s_id = random_ascii()
authenticator = self.actionsmap.get_authenticator(profile)
##################################################################
# Case 1 : credentials were provided #
# We want to validate that the credentials are right #
# Then save the session id/token, and return then in the cookies #
##################################################################
try:
s_secret = self.secrets[s_id]
except KeyError:
s_tokens = {}
else:
try:
s_tokens = request.get_cookie("session.tokens", secret=s_secret) or {}
except:
# Same as for session.id a few lines before
s_tokens = {}
s_new_token = random_ascii()
try:
# Attempt to authenticate
authenticator = self.actionsmap.get_authenticator(profile)
authenticator(credentials, token=(s_id, s_new_token))
s_id, s_token = authenticator.authenticate_credentials(credentials, store_session=True)
except MoulinetteError as e:
if len(s_tokens) > 0:
try:
self.logout(profile)
except:
pass
try:
self.logout(profile)
except Exception:
pass
# FIXME : replace with MoulinetteAuthenticationError !?
raise HTTPResponse(e.strerror, 401)
else:
# Update dicts with new values
s_tokens[profile] = s_new_token
# Save session id and token
# Create and save (in RAM) new cookie secret used to secure(=sign?) the cookie
self.secrets[s_id] = s_secret = random_ascii()
# Fetch current token per profile
try:
s_tokens = request.get_cookie("session.tokens", secret=s_secret) or {}
except Exception:
# Same as for session.id a few lines before
s_tokens = {}
# Update dicts with new values
s_tokens[profile] = s_token
response.set_cookie("session.id", s_id, secure=True)
response.set_cookie(
"session.tokens", s_tokens, secure=True, secret=s_secret
"session.tokens", {""}, secure=True, secret=s_secret
)
return m18n.g("logged_in")
# This is called before each time a route is going to be processed
def _do_authenticate(self, authenticator):
"""Process the authentication
Handle the core.MoulinetteSignals.authenticate signal.
"""
s_id = request.get_cookie("session.id")
try:
s_secret = self.secrets[s_id]
s_token = request.get_cookie("session.tokens", secret=s_secret, default={})[
authenticator.name
]
except KeyError:
msg = m18n.g("authentication_required")
raise HTTPResponse(msg, 401)
else:
authenticator.authenticate_session(s_id, s_token)
def logout(self, profile):
"""Log out from an authenticator profile
@ -412,25 +430,35 @@ class _ActionsMapPlugin(object):
- profile -- The authenticator profile name to log out
"""
s_id = request.get_cookie("session.id")
# We check that there's a (signed) session.hash available
# for additional security ?
# (An attacker could not craft such signed hashed ? (FIXME : need to make sure of this))
# Retrieve session values
try:
s_secret = self.secrets[s_id]
except KeyError:
s_secret = {}
if profile not in request.get_cookie(
"session.tokens", secret=s_secret, default={}
):
raise HTTPResponse(m18n.g("not_logged_in"), 401)
else:
del self.secrets[s_id]
authenticator = self.actionsmap.get_authenticator(profile)
authenticator._clean_session(s_id)
# TODO: Clean the session for profile only
# Delete cookie and clean the session
response.set_cookie("session.tokens", "", max_age=-1)
s_id = request.get_cookie("session.id") or None
except:
# Super rare case where there are super weird cookie / cache issue
# Previous line throws a CookieError that creates a 500 error ...
# So let's catch it and just use None...
s_id = None
if s_id is not None:
# We check that there's a (signed) session.hash available
# for additional security ?
# (An attacker could not craft such signed hashed ? (FIXME : need to make sure of this))
try:
s_secret = self.secrets[s_id]
except KeyError:
s_secret = {}
if profile not in request.get_cookie(
"session.tokens", secret=s_secret, default={}
):
raise HTTPResponse(m18n.g("not_logged_in"), 401)
else:
del self.secrets[s_id]
authenticator = self.actionsmap.get_authenticator(profile)
authenticator._clean_session(s_id)
# TODO: Clean the session for profile only
# Delete cookie and clean the session
response.set_cookie("session.tokens", "", max_age=-1)
return m18n.g("logged_out")
def messages(self):
@ -510,24 +538,6 @@ class _ActionsMapPlugin(object):
# Signals handlers
def _do_authenticate(self, authenticator):
"""Process the authentication
Handle the core.MoulinetteSignals.authenticate signal.
"""
s_id = request.get_cookie("session.id")
try:
s_secret = self.secrets[s_id]
s_token = request.get_cookie("session.tokens", secret=s_secret, default={})[
authenticator.name
]
except KeyError:
msg = m18n.g("authentication_required")
raise HTTPResponse(msg, 401)
else:
return authenticator(token=(s_id, s_token))
def _do_display(self, message, style):
"""Display a message

View file

@ -538,7 +538,8 @@ class Interface(BaseInterface):
# moulinette is used to create a CLI for non-root user that needs to
# auth somehow but hmpf -.-
msg = m18n.g("password")
return authenticator(credentials=self._do_prompt(msg, True, False, color="yellow"))
credentials = self._do_prompt(msg, True, False, color="yellow")
return authenticator.authenticate_credentials(credentials=credentials)
def _do_prompt(self, message, is_password, confirm, color="blue"):
"""Prompt for a value