# -*- coding: utf-8 -*- """ License Copyright (C) 2013 YunoHost This program is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program; if not, see http://www.gnu.org/licenses """ """ yunohost_app.py Manage apps """ import os import sys import json import shutil import stat import yaml import time import re import socket import urlparse from yunohost import YunoHostError, YunoHostLDAP, win_msg, random_password, is_true, validate from yunohost_domain import domain_list, domain_add from yunohost_user import user_info, user_list from yunohost_hook import hook_exec repo_path = '/var/cache/yunohost/repo' apps_path = '/usr/share/yunohost/apps' apps_setting_path= '/etc/yunohost/apps/' install_tmp = '/var/cache/yunohost' app_tmp_folder = install_tmp + '/from_file' def app_listlists(): """ List fetched lists """ list_list = [] try: for filename in os.listdir(repo_path): if '.json' in filename: list_list.append(filename[:len(filename)-5]) except OSError: raise YunoHostError(1, _("No list found")) return { 'Lists' : list_list } def app_fetchlist(url=None, name=None): """ Fetch application list from app server Keyword argument: name -- Name of the list (default fapp) url -- URL of remote JSON list (default http://fapp.yunohost.org/app/list/raw) """ # Create app path if not exists try: os.listdir(repo_path) except OSError: os.makedirs(repo_path) if url is None: url = 'http://app.yunohost.org/list.json' name = 'yunohost' else: if name is None: raise YunoHostError(22, _("You must indicate a name for your custom list")) if os.system('wget "'+ url +'" -O "'+ repo_path +'/'+ name +'.json"') != 0: raise YunoHostError(1, _("List server connection failed")) os.system("touch /etc/cron.d/yunohost-applist-"+ name) os.system("echo '00 00 * * * root yunohost app fetchlist -u "+ url +" -n "+ name +" --no-ldap >> /dev/null' >/etc/cron.d/yunohost-applist-"+ name) win_msg(_("List successfully fetched")) def app_removelist(name): """ Remove list from the repositories Keyword argument: name -- Name of the list to remove """ try: os.remove(repo_path +'/'+ name + '.json') os.remove("/etc/cron.d/yunohost-applist-"+ name) except OSError: raise YunoHostError(22, _("Unknown list")) win_msg(_("List successfully removed")) def app_list(offset=None, limit=None, filter=None, raw=False): """ List apps Keyword argument: filter -- Name filter of app_id or app_name offset -- Starting number for app fetching limit -- Maximum number of app fetched raw -- Return the full app_dict """ # TODO: List installed applications if offset: offset = int(offset) else: offset = 0 if limit: limit = int(limit) else: limit = 1000 applists = os.listdir(repo_path) app_dict = {} if raw: list_dict = {} else: list_dict=[] if not applists: app_fetchlist() applists = os.listdir(repo_path) for applist in applists: if '.json' in applist: with open(repo_path +'/'+ applist) as json_list: app_dict.update(json.loads(str(json_list.read()))) for app in os.listdir(apps_setting_path): if app not in app_dict: # Look for forks if '__' in app: original_app = app[:app.index('__')] if original_app in app_dict: app_dict[app] = app_dict[original_app] continue with open( apps_setting_path + app +'/manifest.json') as json_manifest: app_dict[app] = {"manifest":json.loads(str(json_manifest.read()))} app_dict[app]['manifest']['orphan']=True if len(app_dict) > (0 + offset) and limit > 0: sorted_app_dict = {} for sorted_keys in sorted(app_dict.keys())[offset:]: sorted_app_dict[sorted_keys] = app_dict[sorted_keys] i = 0 for app_id, app_info in sorted_app_dict.items(): if i < limit: if (filter and ((filter in app_id) or (filter in app_info['manifest']['name']))) or not filter: installed = _is_installed(app_id) if raw: app_info['installed'] = installed list_dict[app_id] = app_info else: list_dict.append({ 'ID': app_id, 'Name': app_info['manifest']['name'], 'Description': app_info['manifest']['description'], 'Installed': installed }) i += 1 else: break if not raw: list_dict = { 'Apps': list_dict } return list_dict def app_info(app, raw=False): """ Get app info Keyword argument: app -- Specific app ID raw -- Return the full app_dict """ try: app_info = app_list(filter=app, raw=True)[app] except: app_info = {} if _is_installed(app): with open(apps_setting_path + app +'/settings.yml') as f: app_info['settings'] = yaml.load(f) if raw: return app_info else: return { 'Name': app_info['manifest']['name'], 'Description': app_info['manifest']['description']['en'], #TODO: Add more infos } def app_map(app=None, raw=False, user=None): """ List apps by domain Keyword argument: user -- Allowed app map for a user raw -- Return complete dict app -- Specific app to map """ result = {} for app_id in os.listdir(apps_setting_path): if app and (app != app_id): continue if user is not None: app_dict = app_info(app=app_id, raw=True) if ('mode' not in app_dict['settings']) or ('mode' in app_dict['settings'] and app_dict['settings']['mode'] == 'private'): if 'allowed_users' in app_dict['settings'] and user not in app_dict['settings']['allowed_users'].split(','): continue with open(apps_setting_path + app_id +'/settings.yml') as f: app_settings = yaml.load(f) if 'domain' not in app_settings: continue if raw: if app_settings['domain'] not in result: result[app_settings['domain']] = {} result[app_settings['domain']][app_settings['path']] = { 'label': app_settings['label'], 'id': app_settings['id'] } else: result[app_settings['domain']+app_settings['path']] = app_settings['label'] return result def app_upgrade(app, url=None, file=None): """ Upgrade app Keyword argument: file -- Folder or tarball for upgrade app -- App(s) to upgrade (default all) url -- Git url to fetch for upgrade """ with YunoHostLDAP() as yldap: try: app_list() except YunoHostError: raise YunoHostError(1, _("No app to upgrade")) upgraded_apps = [] # If no app is specified, upgrade all apps if not app: app = os.listdir(apps_setting_path) elif not isinstance(app, list): app = [ app ] for app_id in app: installed = _is_installed(app_id) if not installed: raise YunoHostError(1, app_id + _(" is not installed")) if app_id in upgraded_apps: continue if '__' in app_id: original_app_id = app_id[:app_id.index('__')] else: original_app_id = app_id current_app_dict = app_info(app_id, raw=True) new_app_dict = app_info(original_app_id, raw=True) if file: manifest = _extract_app_from_file(file) elif url: manifest = _fetch_app_from_git(url) elif (new_app_dict['lastUpdate'] > current_app_dict['lastUpdate']) or ('update_time' not in current_app_dict['settings'] and (new_app_dict['lastUpdate'] > current_app_dict['settings']['install_time'])) or ('update_time' in current_app_dict['settings'] and (new_app_dict['lastUpdate'] > current_app_dict['settings']['update_time'])): manifest = _fetch_app_from_git(app_id) else: continue app_setting_path = apps_setting_path +'/'+ app_id if original_app_id != app_id: # Replace original_app_id with the forked one in scripts for file in os.listdir(app_tmp_folder +'/scripts'): #TODO: add hooks directory to the list #TODO: do it with sed ? if file[:1] != '.': with open(app_tmp_folder +'/scripts/'+ file, "r") as sources: lines = sources.readlines() with open(app_tmp_folder +'/scripts/'+ file, "w") as sources: for line in lines: sources.write(re.sub(r''+ original_app_id +'', app_id, line)) # Execute App upgrade script os.system('chown -hR admin: '+ install_tmp) if hook_exec(app_tmp_folder +'/scripts/upgrade') != 0: #TODO: display fail messages from script pass else: app_setting(app_id, 'update_time', int(time.time())) # Move scripts and manifest to the right place os.system('mv "'+ app_tmp_folder +'/manifest.json" "'+ app_tmp_folder +'/scripts" '+ app_setting_path) # So much win upgraded_apps.append(app_id) win_msg(app_id + _(" upgraded successfully")) if not upgraded_apps: raise YunoHostError(1, _("No app to upgrade")) win_msg(_("Upgrade complete")) def app_install(app, label=None, args=None): """ Install apps Keyword argument: label app -- App to install args -- Serialize arguments of installation """ #TODO: Create tool for nginx (check path availability & stuff) with YunoHostLDAP() as yldap: # Fetch or extract sources try: os.listdir(install_tmp) except OSError: os.makedirs(install_tmp) if app in app_list(raw=True) or ('@' in app) or ('http://' in app) or ('https://' in app): manifest = _fetch_app_from_git(app) else: manifest = _extract_app_from_file(app) # Check ID if 'id' not in manifest or '__' in manifest['id']: raise YunoHostError(22, _("App id is invalid")) app_id = manifest['id'] # Check if app can be forked instance_number = _installed_instance_number(app_id, last=True) + 1 if instance_number > 1 : if 'multi_instance' not in manifest or not is_true(manifest['multi_instance']): raise YunoHostError(1, _("App is already installed")) app_id_forked = app_id + '__' + str(instance_number) # Replace app_id with the new one in scripts for file in os.listdir(app_tmp_folder +'/scripts'): #TODO: add hooks directory to the list #TODO: do it with sed ? if file[:1] != '.': with open(app_tmp_folder +'/scripts/'+ file, "r") as sources: lines = sources.readlines() with open(app_tmp_folder +'/scripts/'+ file, "w") as sources: for line in lines: sources.write(re.sub(r''+ app_id +'', app_id_forked, line)) # Change app_id for the rest of the process app_id = app_id_forked # Prepare App settings app_setting_path = apps_setting_path +'/'+ app_id #TMP: Remove old settings if os.path.exists(app_setting_path): shutil.rmtree(app_setting_path) os.makedirs(app_setting_path) os.system('touch '+ app_setting_path +'/settings.yml') app_setting(app_id, 'id', app_id) app_setting(app_id, 'install_time', int(time.time())) if label: app_setting(app_id, 'label', label) else: app_setting(app_id, 'label', manifest['name']) os.system('chown -R admin: '+ app_tmp_folder) try: if args is None: args = '' args_dict = dict(urlparse.parse_qsl(args)) except: args_dict = {} # Execute App install script os.system('chown -hR admin: '+ install_tmp) # Move scripts and manifest to the right place os.system('cp '+ app_tmp_folder +'/manifest.json ' + app_setting_path) os.system('cp -R ' + app_tmp_folder +'/scripts '+ app_setting_path) if hook_exec(app_tmp_folder + '/scripts/install', args_dict) == 0: shutil.rmtree(app_tmp_folder) os.system('chmod -R 400 '+ app_setting_path) os.system('chown -R root: '+ app_setting_path) os.system('chown -R admin: '+ app_setting_path +'/scripts') app_ssowatconf() win_msg(_("Installation complete")) else: #TODO: display script fail messages shutil.rmtree(app_setting_path) shutil.rmtree(app_tmp_folder) raise YunoHostError(1, _("Installation failed")) def app_remove(app): """ Remove app Keyword argument: app -- App(s) to delete """ if not _is_installed(app): raise YunoHostError(22, _("App is not installed")) app_setting_path = apps_setting_path + app #TODO: display fail messages from script try: shutil.rmtree('/tmp/yunohost_remove') except: pass os.system('cp -a '+ app_setting_path + ' /tmp/yunohost_remove && chown -hR admin: /tmp/yunohost_remove') os.system('chown -R admin: /tmp/yunohost_remove') os.system('chmod -R u+rX /tmp/yunohost_remove') if hook_exec('/tmp/yunohost_remove/scripts/remove') != 0: pass if os.path.exists(app_setting_path): shutil.rmtree(app_setting_path) shutil.rmtree('/tmp/yunohost_remove') app_ssowatconf() win_msg(_("App removed: ")+ app) def app_addaccess(apps, users): """ Grant access right to users (everyone by default) Keyword argument: users apps """ #TODO: Adapt to SSOwat if not isinstance(users, list): users = [users] if not isinstance(apps, list): apps = [apps] for app in apps: if not _is_installed(app): raise YunoHostError(22, _("App is not installed")) with open(apps_setting_path + app +'/settings.yml') as f: app_settings = yaml.load(f) if 'mode' not in app_settings: app_setting(app, 'mode', 'private') app_settings['mode'] = 'private' if app_settings['mode'] == 'private': if 'allowed_users' in app_settings: new_users = app_settings['allowed_users'] else: new_users = '' for allowed_user in users: if allowed_user not in new_users.split(','): try: user_info(allowed_user) except YunoHostError: continue if new_users == '': new_users = allowed_user else: new_users = new_users +','+ allowed_user app_setting(app, 'allowed_users', new_users.strip()) app_ssowatconf() def app_removeaccess(apps, users): """ Revoke access right to users (everyone by default) Keyword argument: users apps """ #TODO: Remove access if not isinstance(users, list): users = [users] if not isinstance(apps, list): apps = [apps] for app in apps: new_users = '' if not _is_installed(app): raise YunoHostError(22, _("App is not installed")) with open(apps_setting_path + app +'/settings.yml') as f: app_settings = yaml.load(f) if 'mode' in app_settings and app_settings['mode'] == 'private': if 'allowed_users' in app_settings: for allowed_user in app_settings['allowed_users'].split(','): if allowed_user not in users: if new_users == '': new_users = allowed_user else: new_users = new_users +','+ allowed_user app_setting(app, 'allowed_users', new_users.strip()) app_ssowatconf() def app_setting(app, key, value=None): """ Set ou get an app setting value Keyword argument: value -- Value to set app -- App ID key -- Key to get/set """ settings_file = apps_setting_path + app +'/settings.yml' try: with open(settings_file) as f: app_settings = yaml.load(f) except IOError: # Do not fail if setting file is not there app_settings = {} if value is None: # Get the value if app_settings is not None and key in app_settings: print(app_settings[key]) else: # Set the value if app_settings is None: app_settings = {} if value == '' and key in app_settings: del app_settings[key] else: app_settings[key] = value with open(settings_file, 'w') as f: yaml.safe_dump(app_settings, f, default_flow_style=False) def app_checkport(port): """ Check availability of a local port Keyword argument: port -- Port to check """ try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) s.connect(("localhost", int(port))) s.close() except socket.error: win_msg(_("Port available: ")+ str(port)) else: raise YunoHostError(22, _("Port not available")) def app_checkurl(url, app=None): """ Check availability of a web path Keyword argument: url -- Url to check app -- Write domain & path to app settings for further checks """ if "https://" == url[:8]: url = url[8:] elif "http://" == url[:7]: url = url[7:] if url[-1:] != '/': url = url + '/' domain = url[:url.index('/')] path = url[url.index('/'):] if path[-1:] != '/': path = path + '/' apps_map = app_map(raw=True) validate(r'^([a-zA-Z0-9]{1}([a-zA-Z0-9\-]*[a-zA-Z0-9])*)(\.[a-zA-Z0-9]{1}([a-zA-Z0-9\-]*[a-zA-Z0-9])*)*(\.[a-zA-Z]{1}([a-zA-Z0-9\-]*[a-zA-Z0-9])*)$', domain) if domain not in domain_list()['Domains']: raise YunoHostError(22, _("Domain doesn't exists")) if domain in apps_map: if path in apps_map[domain]: raise YunoHostError(1, _("An app is already installed on this location")) for app_path, v in apps_map[domain].items(): if app_path in path and app_path.count('/') < path.count('/'): raise YunoHostError(1, _("Unable to install app at this location")) if app is not None: app_setting(app, 'domain', value=domain) app_setting(app, 'path', value=path) def app_initdb(user, password=None, db=None, sql=None): """ Create database and initialize it with optionnal attached script Keyword argument: db -- DB name (user unless set) user -- Name of the DB user password -- Password of the DB (generated unless set) sql -- Initial SQL file """ if db is None: db = user return_pwd = False if password is None: password = random_password(12) return_pwd = True print(password) mysql_root_pwd = open('/etc/yunohost/mysql').read().rstrip() mysql_command = 'mysql -u root -p'+ mysql_root_pwd +' -e "CREATE DATABASE '+ db +' ; GRANT ALL PRIVILEGES ON '+ db +'.* TO \''+ user +'\'@localhost IDENTIFIED BY \''+ password +'\';"' if os.system(mysql_command) != 0: raise YunoHostError(1, _("MySQL DB creation failed")) if sql is not None: if os.system('mysql -u '+ user +' -p'+ password +' '+ db +' < '+ sql) != 0: raise YunoHostError(1, _("MySQL DB init failed")) if not return_pwd: win_msg(_("Database initiliazed")) def app_ssowatconf(): """ Regenerate SSOwat configuration file """ with open('/etc/yunohost/current_host', 'r') as f: main_domain = f.readline().rstrip() domains = domain_list()['Domains'] users = {} for user in user_list()['Users']: users[user['Username']] = app_map(user=user['Username']) skipped_uri=[] apps={} for app in app_list()['Apps']: with open(apps_setting_path + app['ID'] +'/settings.yml') as f: app_settings = yaml.load(f) if 'skipped_uris' in app_settings: skipped_uri=[app_settings['domain'] + app_settings['path'][:-1] + item for item in app_settings['skipped_uris'].split(',')] for domain in domains: skipped_uri.extend([domain +'/ynhadmin', domain +'/ynhapi']) conf_dict = { 'portal_domain': main_domain, 'portal_path': '/ynhsso/', 'portal_port': '443', 'portal_scheme': 'https', 'additional_headers': { 'Auth-User': 'uid', 'Remote-User': 'uid', 'Name': 'cn', 'Email': 'mail' }, 'domains': domains, 'skipped_urls': skipped_uri, 'unprotected_urls': [], 'users': users } with open('/etc/ssowat/conf.json', 'wb') as f: json.dump(conf_dict, f) win_msg(_('SSOwat configuration generated')) def _extract_app_from_file(path, remove=False): """ Unzip or untar application tarball in app_tmp_folder, or copy it from a directory Keyword arguments: path -- Path of the tarball or directory remove -- Remove the tarball after extraction Returns: Dict manifest """ global app_tmp_folder if os.path.exists(app_tmp_folder): shutil.rmtree(app_tmp_folder) os.makedirs(app_tmp_folder) if ".zip" in path: extract_result = os.system('cd '+ os.getcwd() +' && unzip '+ path +' -d '+ app_tmp_folder) if remove: os.remove(path) elif ".tar" in path: extract_result = os.system('cd '+ os.getcwd() +' && tar -xf '+ path +' -C '+ app_tmp_folder) if remove: os.remove(path) elif (path[:1] == '/' and os.path.exists(path)) or (os.system('cd '+ os.getcwd() +'/'+ path) == 0): shutil.rmtree(app_tmp_folder) if path[len(path)-1:] != '/': path = path + '/' extract_result = os.system('cd '+ os.getcwd() +' && cp -a "'+ path +'" '+ app_tmp_folder) else: extract_result = 1 if extract_result != 0: raise YunoHostError(22, _("Invalid install file")) try: if len(os.listdir(app_tmp_folder)) == 1: for folder in os.listdir(app_tmp_folder): app_tmp_folder = app_tmp_folder +'/'+ folder with open(app_tmp_folder + '/manifest.json') as json_manifest: manifest = json.loads(str(json_manifest.read())) manifest['lastUpdate'] = int(time.time()) except IOError: raise YunoHostError(1, _("Invalid App file")) win_msg(_("Sources extracted")) return manifest def _fetch_app_from_git(app): """ Unzip or untar application tarball in app_tmp_folder Keyword arguments: app -- App_id or git repo URL Returns: Dict manifest """ global app_tmp_folder if ('@' in app) or ('http://' in app) or ('https://' in app): if "github.com" in app: url = app.replace("git@github.com:", "https://github.com/") if ".git" in url[-4:]: url = url[:-4] if "/" in url [-1:]: url = url[:-1] url = url + "/archive/master.zip" if os.system('wget "'+ url +'" -O "'+ app_tmp_folder +'.zip"') == 0: return _extract_app_from_file(app_tmp_folder +'.zip', remove=True) git_result = os.system('git clone '+ app +' '+ app_tmp_folder) git_result_2 = 0 try: with open(app_tmp_folder + '/manifest.json') as json_manifest: manifest = json.loads(str(json_manifest.read())) manifest['lastUpdate'] = int(time.time()) except IOError: raise YunoHostError(1, _("Invalid App manifest")) else: app_dict = app_list(raw=True) if app in app_dict: app_info = app_dict[app] app_info['manifest']['lastUpdate'] = app_info['lastUpdate'] manifest = app_info['manifest'] else: raise YunoHostError(22, _("App doesn't exists")) if "github.com" in app_info['git']['url']: url = app_info['git']['url'].replace("git@github.com:", "https://github.com/") if ".git" in url[-4:]: url = url[:-4] if "/" in url [-1:]: url = url[:-1] url = url + "/archive/"+ str(app_info['git']['revision']) + ".zip" if os.system('wget "'+ url +'" -O "'+ app_tmp_folder +'.zip"') == 0: return _extract_app_from_file(app_tmp_folder +'.zip', remove=True) app_tmp_folder = install_tmp +'/'+ app if os.path.exists(app_tmp_folder): shutil.rmtree(app_tmp_folder) git_result = os.system('git clone '+ app_info['git']['url'] +' -b '+ app_info['git']['branch'] +' '+ app_tmp_folder) git_result_2 = os.system('cd '+ app_tmp_folder +' && git reset --hard '+ str(app_info['git']['revision'])) if not git_result == git_result_2 == 0: raise YunoHostError(22, _("Sources fetching failed")) win_msg(_("Repository fetched")) return manifest def _installed_instance_number(app, last=False): """ Check if application is installed and return instance number Keyword arguments: app -- id of App to check last -- Return only last instance number Returns: Number of last installed instance | List or instances """ if last: number = 0 try: installed_apps = os.listdir(apps_setting_path) except OSError: os.makedirs(apps_setting_path) return 0 for installed_app in installed_apps: if number == 0 and app == installed_app: number = 1 elif '__' in installed_app: if app == installed_app[:installed_app.index('__')]: if int(installed_app[installed_app.index('__') + 2:]) > number: number = int(installed_app[installed_app.index('__') + 2:]) return number else: instance_number_list = [] instances_dict = app_map(app=app, raw=True) for key, domain in instances_dict.items(): for key, path in domain.items(): instance_number_list.append(path['instance']) return sorted(instance_number_list) def _is_installed(app): """ Check if application is installed Keyword arguments: app -- id of App to check Returns: Boolean """ try: installed_apps = os.listdir(apps_setting_path) except OSError: os.makedirs(apps_setting_path) return False for installed_app in installed_apps: if app == installed_app: return True else: continue return False