mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
602 lines
21 KiB
Python
602 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import re
|
|
import logging
|
|
import argparse
|
|
import copy
|
|
import datetime
|
|
from collections import deque, OrderedDict
|
|
from json.encoder import JSONEncoder
|
|
from typing import Optional
|
|
|
|
from moulinette import m18n
|
|
from moulinette.core import MoulinetteError
|
|
|
|
logger = logging.getLogger("moulinette.interface")
|
|
|
|
# FIXME : are these even used for anything useful ...
|
|
TO_RETURN_PROP = "_to_return"
|
|
CALLBACKS_PROP = "_callbacks"
|
|
|
|
|
|
# Base Class -----------------------------------------------------------
|
|
|
|
|
|
class BaseActionsMapParser:
|
|
|
|
"""Actions map's base Parser
|
|
|
|
Each interfaces must implement an ActionsMapParser class derived
|
|
from this class which must overrides virtual properties and methods.
|
|
It is used to parse the main parts of the actions map (i.e. global
|
|
arguments, categories and actions). It implements methods to set/get
|
|
the global and actions configuration.
|
|
|
|
Keyword arguments:
|
|
- parent -- A parent BaseActionsMapParser derived object
|
|
|
|
"""
|
|
|
|
def __init__(self, parent=None, **kwargs):
|
|
if not parent:
|
|
logger.debug("initializing base actions map parser for %s", self.interface)
|
|
|
|
# Virtual properties
|
|
# Each parser classes must implement these properties.
|
|
|
|
"""The name of the interface for which it is the parser"""
|
|
interface: Optional[str] = None
|
|
|
|
# Virtual methods
|
|
# Each parser classes must implement these methods.
|
|
|
|
@staticmethod
|
|
def format_arg_names(name, full):
|
|
"""Format argument name
|
|
|
|
Format agument name depending on its 'full' parameter and return
|
|
a list of strings which will be used as name or option strings
|
|
for the argument parser.
|
|
|
|
Keyword arguments:
|
|
- name -- The argument name
|
|
- full -- The argument's 'full' parameter
|
|
|
|
Returns:
|
|
A list of option strings
|
|
|
|
"""
|
|
raise NotImplementedError("derived class must override this method")
|
|
|
|
def has_global_parser(self):
|
|
return False
|
|
|
|
def add_global_parser(self, **kwargs):
|
|
"""Add a parser for global arguments
|
|
|
|
Create and return an argument parser for global arguments.
|
|
|
|
Returns:
|
|
An ArgumentParser based object
|
|
|
|
"""
|
|
raise NotImplementedError(
|
|
"derived class '%s' must override this method" % self.__class__.__name__
|
|
)
|
|
|
|
def add_category_parser(self, name, **kwargs):
|
|
"""Add a parser for a category
|
|
|
|
Create a new category and return a parser for it.
|
|
|
|
Keyword arguments:
|
|
- name -- The category name
|
|
|
|
Returns:
|
|
A BaseParser based object
|
|
|
|
"""
|
|
raise NotImplementedError(
|
|
"derived class '%s' must override this method" % self.__class__.__name__
|
|
)
|
|
|
|
def add_action_parser(self, name, tid, **kwargs):
|
|
"""Add a parser for an action
|
|
|
|
Create a new action and return an argument parser for it.
|
|
|
|
Keyword arguments:
|
|
- name -- The action name
|
|
- tid -- The tuple identifier of the action
|
|
|
|
Returns:
|
|
An ArgumentParser based object
|
|
|
|
"""
|
|
raise NotImplementedError(
|
|
"derived class '%s' must override this method" % self.__class__.__name__
|
|
)
|
|
|
|
def auth_method(self, *args, **kwargs):
|
|
"""Check if authentication is required to run the requested action
|
|
|
|
Keyword arguments:
|
|
- args -- Arguments string or dict (TODO)
|
|
|
|
Returns:
|
|
False, or the authentication profile required
|
|
|
|
"""
|
|
raise NotImplementedError(
|
|
"derived class '%s' must override this method" % self.__class__.__name__
|
|
)
|
|
|
|
def parse_args(self, args, **kwargs):
|
|
"""Parse arguments
|
|
|
|
Convert argument variables to objects and assign them as
|
|
attributes of the namespace.
|
|
|
|
Keyword arguments:
|
|
- args -- Arguments string or dict (TODO)
|
|
|
|
Returns:
|
|
The populated namespace
|
|
|
|
"""
|
|
raise NotImplementedError(
|
|
"derived class '%s' must override this method" % self.__class__.__name__
|
|
)
|
|
|
|
# Arguments helpers
|
|
|
|
@staticmethod
|
|
def prepare_action_namespace(tid, namespace=None):
|
|
"""Prepare the namespace for a given action"""
|
|
# Validate tid and namespace
|
|
if not isinstance(tid, tuple) and (
|
|
namespace is None or not hasattr(namespace, TO_RETURN_PROP)
|
|
):
|
|
raise MoulinetteError("invalid_usage")
|
|
elif not tid:
|
|
tid = "_global"
|
|
|
|
# Prepare namespace
|
|
if namespace is None:
|
|
namespace = argparse.Namespace()
|
|
namespace._tid = tid
|
|
|
|
return namespace
|
|
|
|
|
|
# Argument parser ------------------------------------------------------
|
|
|
|
|
|
class _CallbackAction(argparse.Action):
|
|
def __init__(
|
|
self,
|
|
option_strings,
|
|
dest,
|
|
nargs=0,
|
|
callback={},
|
|
default=argparse.SUPPRESS,
|
|
help=None,
|
|
):
|
|
if not callback or "method" not in callback:
|
|
raise ValueError("callback must be provided with at least " "a method key")
|
|
super(_CallbackAction, self).__init__(
|
|
option_strings=option_strings,
|
|
dest=dest,
|
|
nargs=nargs,
|
|
default=default,
|
|
help=help,
|
|
)
|
|
self.callback_method = callback.get("method")
|
|
self.callback_kwargs = callback.get("kwargs", {})
|
|
self.callback_return = callback.get("return", False)
|
|
|
|
@property
|
|
def callback(self):
|
|
if not hasattr(self, "_callback"):
|
|
self._retrieve_callback()
|
|
return self._callback
|
|
|
|
def _retrieve_callback(self):
|
|
# Attempt to retrieve callback method
|
|
mod_name, func_name = (self.callback_method).rsplit(".", 1)
|
|
try:
|
|
mod = __import__(mod_name, globals=globals(), level=0, fromlist=[func_name])
|
|
func = getattr(mod, func_name)
|
|
except (AttributeError, ImportError):
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise ValueError("unable to import method {}".format(self.callback_method))
|
|
self._callback = func
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
parser.enqueue_callback(namespace, self, values)
|
|
if self.callback_return:
|
|
setattr(namespace, TO_RETURN_PROP, {})
|
|
|
|
def execute(self, namespace, values):
|
|
try:
|
|
# Execute callback and get returned value
|
|
value = self.callback(namespace, values, **self.callback_kwargs)
|
|
except Exception as e:
|
|
error_message = (
|
|
"cannot get value from callback method "
|
|
"'{}': {}".format(self.callback_method, e)
|
|
)
|
|
logger.exception(error_message)
|
|
raise MoulinetteError(error_message, raw_msg=True)
|
|
else:
|
|
if value:
|
|
if self.callback_return:
|
|
setattr(namespace, TO_RETURN_PROP, value)
|
|
else:
|
|
setattr(namespace, self.dest, value)
|
|
|
|
|
|
class _ExtendedSubParsersAction(argparse._SubParsersAction):
|
|
|
|
"""Subparsers with extended properties for argparse
|
|
|
|
It provides the following additional properties at initialization,
|
|
e.g. using `parser.add_subparsers`:
|
|
- required -- Either the subparser is required or not (default: False)
|
|
|
|
It also provides the following additional properties for parsers,
|
|
e.g. using `subparsers.add_parser`:
|
|
- deprecated -- Wether the command is deprecated
|
|
- deprecated_alias -- A list of deprecated command alias names
|
|
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
required = kwargs.pop("required", False)
|
|
super(_ExtendedSubParsersAction, self).__init__(*args, **kwargs)
|
|
|
|
self.required = required
|
|
self._deprecated_command_map = {}
|
|
|
|
def add_parser(self, name, type_=None, **kwargs):
|
|
deprecated = kwargs.pop("deprecated", False)
|
|
deprecated_alias = kwargs.pop("deprecated_alias", [])
|
|
|
|
if deprecated:
|
|
self._deprecated_command_map[name] = None
|
|
if "help" in kwargs:
|
|
del kwargs["help"]
|
|
|
|
parser = super(_ExtendedSubParsersAction, self).add_parser(name, **kwargs)
|
|
|
|
# Append each deprecated command alias name
|
|
for command in deprecated_alias:
|
|
self._deprecated_command_map[command] = name
|
|
self._name_parser_map[command] = parser
|
|
|
|
parser.type = type_
|
|
|
|
return parser
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
parser_name = values[0]
|
|
|
|
try:
|
|
# Check for deprecated command name
|
|
correct_name = self._deprecated_command_map[parser_name]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
# Warn the user about deprecated command
|
|
if correct_name is None:
|
|
logger.warning(
|
|
m18n.g("deprecated_command", prog=parser.prog, command=parser_name)
|
|
)
|
|
else:
|
|
logger.warning(
|
|
m18n.g(
|
|
"deprecated_command_alias",
|
|
old=parser_name,
|
|
new=correct_name,
|
|
prog=parser.prog,
|
|
)
|
|
)
|
|
values[0] = correct_name
|
|
|
|
return super(_ExtendedSubParsersAction, self).__call__(
|
|
parser, namespace, values, option_string
|
|
)
|
|
|
|
|
|
class ExtendedArgumentParser(argparse.ArgumentParser):
|
|
def __init__(self, *args, **kwargs):
|
|
super(ExtendedArgumentParser, self).__init__(
|
|
formatter_class=PositionalsFirstHelpFormatter, *args, **kwargs
|
|
)
|
|
|
|
# Register additional actions
|
|
self.register("action", "callback", _CallbackAction)
|
|
self.register("action", "parsers", _ExtendedSubParsersAction)
|
|
|
|
def enqueue_callback(self, namespace, callback, values):
|
|
queue = self._get_callbacks_queue(namespace)
|
|
queue.append((callback, values))
|
|
|
|
def dequeue_callbacks(self, namespace):
|
|
queue = self._get_callbacks_queue(namespace, False)
|
|
for _i in range(len(queue)):
|
|
c, v = queue.popleft()
|
|
# FIXME: break dequeue if callback returns
|
|
c.execute(namespace, v)
|
|
try:
|
|
delattr(namespace, CALLBACKS_PROP)
|
|
except:
|
|
pass
|
|
|
|
def _get_callbacks_queue(self, namespace, create=True):
|
|
try:
|
|
queue = getattr(namespace, CALLBACKS_PROP)
|
|
except AttributeError:
|
|
if create:
|
|
queue = deque()
|
|
setattr(namespace, CALLBACKS_PROP, queue)
|
|
else:
|
|
queue = list()
|
|
return queue
|
|
|
|
def add_arguments(
|
|
self, arguments, extraparser, format_arg_names=None, validate_extra=True
|
|
):
|
|
for argument_name, argument_options in arguments.items():
|
|
# will adapt arguments name for cli or api context
|
|
names = format_arg_names(
|
|
str(argument_name), argument_options.pop("full", None)
|
|
)
|
|
|
|
if "type" in argument_options:
|
|
argument_options["type"] = eval(argument_options["type"])
|
|
|
|
if "extra" in argument_options:
|
|
extra = argument_options.pop("extra")
|
|
argument_dest = self.add_argument(*names, **argument_options).dest
|
|
extraparser.add_argument(
|
|
self.get_default("_tid"), argument_dest, extra, validate_extra
|
|
)
|
|
continue
|
|
|
|
self.add_argument(*names, **argument_options)
|
|
|
|
def _get_nargs_pattern(self, action):
|
|
if action.nargs == argparse.PARSER and not action.required:
|
|
return "([-AO]*)"
|
|
else:
|
|
return super(ExtendedArgumentParser, self)._get_nargs_pattern(action)
|
|
|
|
def _get_values(self, action, arg_strings):
|
|
if action.nargs == argparse.PARSER and not action.required:
|
|
value = [self._get_value(action, v) for v in arg_strings]
|
|
if value:
|
|
self._check_value(action, value[0])
|
|
else:
|
|
value = argparse.SUPPRESS
|
|
else:
|
|
value = super(ExtendedArgumentParser, self)._get_values(action, arg_strings)
|
|
return value
|
|
|
|
# Adapted from :
|
|
# https://github.com/python/cpython/blob/af26c15110b76195e62a06d17e39176d42c0511c/Lib/argparse.py#L2293-L2314
|
|
def format_help(self):
|
|
formatter = self._get_formatter()
|
|
|
|
# usage
|
|
formatter.add_usage(self.usage, self._actions, self._mutually_exclusive_groups)
|
|
|
|
# description
|
|
formatter.add_text(self.description)
|
|
|
|
# positionals, optionals and user-defined groups
|
|
for action_group in self._action_groups:
|
|
|
|
# Dirty hack to separate 'subcommands'
|
|
# into 'actions' and 'subcategories'
|
|
if action_group.title == "subcommands":
|
|
|
|
# Make a copy of the "action group actions"...
|
|
choice_actions = action_group._group_actions[0]._choices_actions
|
|
actions_subparser = copy.copy(action_group._group_actions[0])
|
|
subcategories_subparser = copy.copy(action_group._group_actions[0])
|
|
|
|
# Filter "action"-type and "subcategory"-type commands
|
|
actions_subparser.choices = OrderedDict(
|
|
[
|
|
(k, v)
|
|
for k, v in actions_subparser.choices.items()
|
|
if v.type == "action"
|
|
]
|
|
)
|
|
subcategories_subparser.choices = OrderedDict(
|
|
[
|
|
(k, v)
|
|
for k, v in subcategories_subparser.choices.items()
|
|
if v.type == "subcategory"
|
|
]
|
|
)
|
|
|
|
actions_choices = actions_subparser.choices.keys()
|
|
subcategories_choices = subcategories_subparser.choices.keys()
|
|
|
|
actions_subparser._choices_actions = [
|
|
c for c in choice_actions if c.dest in actions_choices
|
|
]
|
|
subcategories_subparser._choices_actions = [
|
|
c for c in choice_actions if c.dest in subcategories_choices
|
|
]
|
|
|
|
# Display each section (actions and subcategories)
|
|
if actions_choices:
|
|
formatter.start_section("actions")
|
|
formatter.add_arguments([actions_subparser])
|
|
formatter.end_section()
|
|
|
|
if subcategories_choices:
|
|
formatter.start_section("subcategories")
|
|
formatter.add_arguments([subcategories_subparser])
|
|
formatter.end_section()
|
|
|
|
else:
|
|
formatter.start_section(action_group.title)
|
|
formatter.add_text(action_group.description)
|
|
formatter.add_arguments(action_group._group_actions)
|
|
formatter.end_section()
|
|
|
|
# epilog
|
|
formatter.add_text(self.epilog)
|
|
|
|
# determine help from format above
|
|
return formatter.format_help()
|
|
|
|
|
|
# This is copy-pasta from the original argparse.HelpFormatter :
|
|
# https://github.com/python/cpython/blob/1e73dbbc29c96d0739ffef92db36f63aa1aa30da/Lib/argparse.py#L293-L383
|
|
# tweaked to display positional arguments first in usage/--help
|
|
#
|
|
# This is motivated by the "bug" / inconsistent behavior described here :
|
|
# http://bugs.python.org/issue9338
|
|
# and fix is inspired from here :
|
|
# https://stackoverflow.com/questions/26985650/argparse-do-not-catch-positional-arguments-with-nargs/26986546#26986546
|
|
class PositionalsFirstHelpFormatter(argparse.HelpFormatter):
|
|
def _format_usage(self, usage, actions, groups, prefix):
|
|
if prefix is None:
|
|
# TWEAK : not using gettext here...
|
|
prefix = "usage: "
|
|
|
|
# if usage is specified, use that
|
|
if usage is not None:
|
|
usage = usage % dict(prog=self._prog)
|
|
|
|
# if no optionals or positionals are available, usage is just prog
|
|
elif usage is None and not actions:
|
|
usage = "%(prog)s" % dict(prog=self._prog)
|
|
|
|
# if optionals and positionals are available, calculate usage
|
|
elif usage is None:
|
|
prog = "%(prog)s" % dict(prog=self._prog)
|
|
|
|
# split optionals from positionals
|
|
optionals = []
|
|
positionals = []
|
|
for action in actions:
|
|
if action.option_strings:
|
|
optionals.append(action)
|
|
else:
|
|
positionals.append(action)
|
|
|
|
# build full usage string
|
|
format = self._format_actions_usage
|
|
# TWEAK here : positionals first
|
|
action_usage = format(positionals + optionals, groups)
|
|
usage = " ".join([s for s in [prog, action_usage] if s])
|
|
|
|
# wrap the usage parts if it's too long
|
|
text_width = self._width - self._current_indent
|
|
if len(prefix) + len(usage) > text_width:
|
|
|
|
# break usage into wrappable parts
|
|
part_regexp = r"\(.*?\)+|\[.*?\]+|\S+"
|
|
opt_usage = format(optionals, groups)
|
|
pos_usage = format(positionals, groups)
|
|
opt_parts = re.findall(part_regexp, opt_usage)
|
|
pos_parts = re.findall(part_regexp, pos_usage)
|
|
assert " ".join(opt_parts) == opt_usage
|
|
assert " ".join(pos_parts) == pos_usage
|
|
|
|
# helper for wrapping lines
|
|
def get_lines(parts, indent, prefix=None):
|
|
lines = []
|
|
line = []
|
|
if prefix is not None:
|
|
line_len = len(prefix) - 1
|
|
else:
|
|
line_len = len(indent) - 1
|
|
for part in parts:
|
|
if line_len + 1 + len(part) > text_width:
|
|
lines.append(indent + " ".join(line))
|
|
line = []
|
|
line_len = len(indent) - 1
|
|
line.append(part)
|
|
line_len += len(part) + 1
|
|
if line:
|
|
lines.append(indent + " ".join(line))
|
|
if prefix is not None:
|
|
lines[0] = lines[0][len(indent) :]
|
|
return lines
|
|
|
|
# if prog is short, follow it with optionals or positionals
|
|
if len(prefix) + len(prog) <= 0.75 * text_width:
|
|
indent = " " * (len(prefix) + len(prog) + 1)
|
|
# START TWEAK : pos_parts first, then opt_parts
|
|
if pos_parts:
|
|
lines = get_lines([prog] + pos_parts, indent, prefix)
|
|
lines.extend(get_lines(opt_parts, indent))
|
|
elif opt_parts:
|
|
lines = get_lines([prog] + opt_parts, indent, prefix)
|
|
# END TWEAK
|
|
else:
|
|
lines = [prog]
|
|
|
|
# if prog is long, put it on its own line
|
|
else:
|
|
indent = " " * len(prefix)
|
|
parts = pos_parts + opt_parts
|
|
lines = get_lines(parts, indent)
|
|
if len(lines) > 1:
|
|
lines = []
|
|
# TWEAK here : pos_parts first, then opt_part
|
|
lines.extend(get_lines(pos_parts, indent))
|
|
lines.extend(get_lines(opt_parts, indent))
|
|
lines = [prog] + lines
|
|
|
|
# join lines into usage
|
|
usage = "\n".join(lines)
|
|
|
|
# prefix with 'usage:'
|
|
return "{}{}\n\n".format(prefix, usage)
|
|
|
|
|
|
class JSONExtendedEncoder(JSONEncoder):
|
|
|
|
"""Extended JSON encoder
|
|
|
|
Extend default JSON encoder to recognize more types and classes. It will
|
|
never raise an exception if the object can't be encoded and return its repr
|
|
instead.
|
|
|
|
The following objects and types are supported:
|
|
- set: converted into list
|
|
|
|
"""
|
|
|
|
def default(self, o):
|
|
|
|
import pytz # Lazy loading, this takes like 3+ sec on a RPi2 ?!
|
|
|
|
"""Return a serializable object"""
|
|
# Convert compatible containers into list
|
|
if isinstance(o, set) or (hasattr(o, "__iter__") and hasattr(o, "next")):
|
|
return list(o)
|
|
|
|
# Display the date in its iso format ISO-8601 Internet Profile (RFC 3339)
|
|
if isinstance(o, datetime.date):
|
|
if o.tzinfo is None:
|
|
o = o.replace(tzinfo=pytz.utc)
|
|
return o.isoformat()
|
|
|
|
# Return the repr for object that json can't encode
|
|
logger.warning(
|
|
"cannot properly encode in JSON the object %s, " "returned repr is: %r",
|
|
type(o),
|
|
o,
|
|
)
|
|
return repr(o)
|