Merge pull request #486 from YunoHost/app_actions

[enh] POC for app actions
This commit is contained in:
Bram 2018-06-22 05:32:22 +02:00 committed by GitHub
commit 434e438f94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 240 additions and 122 deletions

View file

@ -773,6 +773,32 @@ app:
subcategories:
action:
subcategory_help: Handle apps actions
actions:
### app_action_list()
list:
action_help: List app actions
api: GET /<app>/actions
arguments:
app_id:
help: app id
### app_action_run()
run:
action_help: Run app action
api: PUT /<app>/actions/<action>
arguments:
app_id:
help: app id
action:
help: action id
-a:
full: --args
help: Serialized arguments for app script (i.e. "domain=domain.tld&path=/path")
config:
subcategory_help: Applications configuration panel
actions:

View file

@ -641,11 +641,9 @@ def app_upgrade(auth, app=[], url=None, file=None):
os.system('rm -rf "%s/scripts" "%s/manifest.json %s/conf"' % (app_setting_path, app_setting_path, app_setting_path))
os.system('mv "%s/manifest.json" "%s/scripts" %s' % (extracted_app_folder, extracted_app_folder, app_setting_path))
if os.path.exists(os.path.join(extracted_app_folder, "config_panel.json")):
os.system('cp -R %s/config_panel.json %s' % (extracted_app_folder, app_setting_path))
if os.path.exists(os.path.join(extracted_app_folder, "conf")):
os.system('cp -R %s/conf %s' % (extracted_app_folder, app_setting_path))
for file_to_copy in ["actions.json", "config_panel.json", "conf"]:
if os.path.exists(os.path.join(extracted_app_folder, file_to_copy)):
os.system('cp -R %s/%s %s' % (extracted_app_folder, file_to_copy, app_setting_path))
# So much win
upgraded_apps.append(app_instance_name)
@ -761,11 +759,9 @@ def app_install(auth, app, label=None, args=None, no_remove_on_failure=False):
os.system('cp %s/manifest.json %s' % (extracted_app_folder, app_setting_path))
os.system('cp -R %s/scripts %s' % (extracted_app_folder, app_setting_path))
if os.path.exists(os.path.join(extracted_app_folder, "config_panel.json")):
os.system('cp -R %s/config_panel.json %s' % (extracted_app_folder, app_setting_path))
if os.path.exists(os.path.join(extracted_app_folder, "conf")):
os.system('cp -R %s/conf %s' % (extracted_app_folder, app_setting_path))
for file_to_copy in ["actions.json", "config_panel.json", "conf"]:
if os.path.exists(os.path.join(extracted_app_folder, file_to_copy)):
os.system('cp -R %s/%s %s' % (extracted_app_folder, file_to_copy, app_setting_path))
# Execute the app install script
install_retcode = 1
@ -1367,6 +1363,70 @@ def app_change_label(auth, app, new_label):
app_ssowatconf(auth)
# actions todo list:
# * docstring
def app_action_list(app_id):
logger.warning(m18n.n('experimental_feature'))
installed = _is_installed(app_id)
if not installed:
raise MoulinetteError(errno.ENOPKG,
m18n.n('app_not_installed', app=app_id))
actions = os.path.join(APPS_SETTING_PATH, app_id, 'actions.json')
return {
"actions": read_json(actions) if os.path.exists(actions) else [],
}
def app_action_run(app_id, action, args=None):
logger.warning(m18n.n('experimental_feature'))
from yunohost.hook import hook_exec
import tempfile
# will raise if action doesn't exist
actions = app_action_list(app_id)["actions"]
if action not in actions:
raise MoulinetteError(errno.EINVAL, "action '%s' not available for app '%s', available actions are: %s" % (action, app_id, ", ".join(actions.keys())))
action_declaration = actions[action]
# Retrieve arguments list for install script
args_dict = dict(urlparse.parse_qsl(args, keep_blank_values=True)) if args else {}
args_odict = _parse_args_for_action(actions[action], args=args_dict)
args_list = args_odict.values()
env_dict = _make_environment_dict(args_odict, prefix="ACTION_")
env_dict["YNH_APP_ID"] = app_id
env_dict["YNH_ACTION"] = action
_, path = tempfile.mkstemp()
with open(path, "w") as script:
script.write(action_declaration["command"])
os.chmod(path, 700)
retcode = hook_exec(
path,
args=args_list,
env=env_dict,
chdir=action_declaration.get("cwd"),
user=action_declaration.get("user", "root"),
)
if retcode not in action_declaration.get("accepted_return_codes", [0]):
raise MoulinetteError(retcode, "Error while executing action '%s' of app '%s': return code %s" % (action, app_id, retcode))
os.remove(path)
return logger.success("Action successed!")
# Config panel todo list:
# * docstrings
# * merge translations on the json once the workflow is in place
@ -1945,142 +2005,173 @@ def _parse_args_from_manifest(manifest, action, args={}, auth=None):
args -- A dictionnary of arguments to parse
"""
from yunohost.domain import (domain_list, _get_maindomain,
domain_url_available, _normalize_domain_path)
from yunohost.user import user_info
args_dict = OrderedDict()
try:
action_args = manifest['arguments'][action]
except KeyError:
logger.debug("no arguments found for '%s' in manifest", action)
else:
for arg in action_args:
arg_name = arg['name']
arg_type = arg.get('type', 'string')
arg_default = arg.get('default', None)
arg_choices = arg.get('choices', [])
arg_value = None
return _parse_action_args_in_yunohost_format(args, action_args, auth)
# Transpose default value for boolean type and set it to
# false if not defined.
if arg_type == 'boolean':
arg_default = 1 if arg_default else 0
# Attempt to retrieve argument value
if arg_name in args:
arg_value = args[arg_name]
else:
if 'ask' in arg:
# Retrieve proper ask string
ask_string = _value_for_locale(arg['ask'])
def _parse_args_for_action(action, args={}, auth=None):
"""Parse arguments needed for an action from the actions list
# Append extra strings
if arg_type == 'boolean':
ask_string += ' [0 | 1]'
elif arg_choices:
ask_string += ' [{0}]'.format(' | '.join(arg_choices))
if arg_default is not None:
ask_string += ' (default: {0})'.format(arg_default)
Retrieve specified arguments for the action from the manifest, and parse
given args according to that. If some required arguments are not provided,
its values will be asked if interaction is possible.
Parsed arguments will be returned as an OrderedDict
# Check for a password argument
is_password = True if arg_type == 'password' else False
Keyword arguments:
action -- The action
args -- A dictionnary of arguments to parse
if arg_type == 'domain':
arg_default = _get_maindomain()
ask_string += ' (default: {0})'.format(arg_default)
msignals.display(m18n.n('domains_available'))
for domain in domain_list(auth)['domains']:
msignals.display("- {}".format(domain))
"""
args_dict = OrderedDict()
try:
input_string = msignals.prompt(ask_string, is_password)
except NotImplementedError:
input_string = None
if (input_string == '' or input_string is None) \
and arg_default is not None:
arg_value = arg_default
else:
arg_value = input_string
elif arg_default is not None:
arg_value = arg_default
if 'arguments' not in action:
logger.debug("no arguments found for '%s' in manifest", action)
return args_dict
# Validate argument value
if (arg_value is None or arg_value == '') \
and not arg.get('optional', False):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_required', name=arg_name))
elif arg_value is None:
args_dict[arg_name] = ''
continue
action_args = action['arguments']
# Validate argument choice
if arg_choices and arg_value not in arg_choices:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_choice_invalid',
name=arg_name, choices=', '.join(arg_choices)))
return _parse_action_args_in_yunohost_format(args, action_args, auth)
def _parse_action_args_in_yunohost_format(args, action_args, auth=None):
"""Parse arguments store in either manifest.json or actions.json
"""
from yunohost.domain import (domain_list, _get_maindomain,
domain_url_available, _normalize_domain_path)
from yunohost.user import user_info
args_dict = OrderedDict()
for arg in action_args:
arg_name = arg['name']
arg_type = arg.get('type', 'string')
arg_default = arg.get('default', None)
arg_choices = arg.get('choices', [])
arg_value = None
# Transpose default value for boolean type and set it to
# false if not defined.
if arg_type == 'boolean':
arg_default = 1 if arg_default else 0
# Attempt to retrieve argument value
if arg_name in args:
arg_value = args[arg_name]
else:
if 'ask' in arg:
# Retrieve proper ask string
ask_string = _value_for_locale(arg['ask'])
# Append extra strings
if arg_type == 'boolean':
ask_string += ' [0 | 1]'
elif arg_choices:
ask_string += ' [{0}]'.format(' | '.join(arg_choices))
if arg_default is not None:
ask_string += ' (default: {0})'.format(arg_default)
# Check for a password argument
is_password = True if arg_type == 'password' else False
if arg_type == 'domain':
arg_default = _get_maindomain()
ask_string += ' (default: {0})'.format(arg_default)
msignals.display(m18n.n('domains_available'))
for domain in domain_list(auth)['domains']:
msignals.display("- {}".format(domain))
# Validate argument type
if arg_type == 'domain':
if arg_value not in domain_list(auth)['domains']:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_invalid',
name=arg_name, error=m18n.n('domain_unknown')))
elif arg_type == 'user':
try:
user_info(auth, arg_value)
except MoulinetteError as e:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_invalid',
name=arg_name, error=e.strerror))
elif arg_type == 'app':
if not _is_installed(arg_value):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_invalid',
name=arg_name, error=m18n.n('app_unknown')))
elif arg_type == 'boolean':
if isinstance(arg_value, bool):
arg_value = 1 if arg_value else 0
input_string = msignals.prompt(ask_string, is_password)
except NotImplementedError:
input_string = None
if (input_string == '' or input_string is None) \
and arg_default is not None:
arg_value = arg_default
else:
try:
arg_value = int(arg_value)
if arg_value not in [0, 1]:
raise ValueError()
except (TypeError, ValueError):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_choice_invalid',
name=arg_name, choices='0, 1'))
args_dict[arg_name] = arg_value
arg_value = input_string
elif arg_default is not None:
arg_value = arg_default
# END loop over action_args...
# Validate argument value
if (arg_value is None or arg_value == '') \
and not arg.get('optional', False):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_required', name=arg_name))
elif arg_value is None:
args_dict[arg_name] = ''
continue
# If there's only one "domain" and "path", validate that domain/path
# is an available url and normalize the path.
# Validate argument choice
if arg_choices and arg_value not in arg_choices:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_choice_invalid',
name=arg_name, choices=', '.join(arg_choices)))
domain_args = [arg["name"] for arg in action_args
if arg.get("type", "string") == "domain"]
path_args = [arg["name"] for arg in action_args
if arg.get("type", "string") == "path"]
if len(domain_args) == 1 and len(path_args) == 1:
domain = args_dict[domain_args[0]]
path = args_dict[path_args[0]]
domain, path = _normalize_domain_path(domain, path)
# Check the url is available
if not domain_url_available(auth, domain, path):
# Validate argument type
if arg_type == 'domain':
if arg_value not in domain_list(auth)['domains']:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_location_unavailable'))
m18n.n('app_argument_invalid',
name=arg_name, error=m18n.n('domain_unknown')))
elif arg_type == 'user':
try:
user_info(auth, arg_value)
except MoulinetteError as e:
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_invalid',
name=arg_name, error=e.strerror))
elif arg_type == 'app':
if not _is_installed(arg_value):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_invalid',
name=arg_name, error=m18n.n('app_unknown')))
elif arg_type == 'boolean':
if isinstance(arg_value, bool):
arg_value = 1 if arg_value else 0
else:
try:
arg_value = int(arg_value)
if arg_value not in [0, 1]:
raise ValueError()
except (TypeError, ValueError):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_argument_choice_invalid',
name=arg_name, choices='0, 1'))
args_dict[arg_name] = arg_value
# (We save this normalized path so that the install script have a
# standard path format to deal with no matter what the user inputted)
args_dict[path_args[0]] = path
# END loop over action_args...
# If there's only one "domain" and "path", validate that domain/path
# is an available url and normalize the path.
domain_args = [arg["name"] for arg in action_args
if arg.get("type", "string") == "domain"]
path_args = [arg["name"] for arg in action_args
if arg.get("type", "string") == "path"]
if len(domain_args) == 1 and len(path_args) == 1:
domain = args_dict[domain_args[0]]
path = args_dict[path_args[0]]
domain, path = _normalize_domain_path(domain, path)
# Check the url is available
if not domain_url_available(auth, domain, path):
raise MoulinetteError(errno.EINVAL,
m18n.n('app_location_unavailable'))
# (We save this normalized path so that the install script have a
# standard path format to deal with no matter what the user inputted)
args_dict[path_args[0]] = path
return args_dict
def _make_environment_dict(args_dict):
def _make_environment_dict(args_dict, prefix="APP_ARG_"):
"""
Convert a dictionnary containing manifest arguments
to a dictionnary of env. var. to be passed to scripts
@ -2091,7 +2182,7 @@ def _make_environment_dict(args_dict):
"""
env_dict = {}
for arg_name, arg_value in args_dict.items():
env_dict["YNH_APP_ARG_%s" % arg_name.upper()] = arg_value
env_dict["YNH_%s%s" % (prefix, arg_name.upper())] = arg_value
return env_dict

View file

@ -365,6 +365,7 @@ def hook_exec(path, args=None, raise_on_error=False, no_trace=False,
stdout_callback if stdout_callback else lambda l: logger.debug(l.rstrip()),
stderr_callback if stderr_callback else lambda l: logger.warning(l.rstrip()),
)
logger.debug("About to run the command '%s'" % command)
returncode = call_async_output(
command, callbacks, shell=False, cwd=chdir
)