From 803f379c815b9fc8a8215d603aaa5d01f116b5e5 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Tue, 19 Sep 2023 01:15:01 +0200 Subject: [PATCH] appstore: black app.py and utils.py --- store/app.py | 224 ++++++++++++++++++++++++++++++++++++------------- store/utils.py | 50 ++++++----- 2 files changed, 198 insertions(+), 76 deletions(-) diff --git a/store/app.py b/store/app.py index 9d9ae36e..d54bea73 100644 --- a/store/app.py +++ b/store/app.py @@ -10,18 +10,33 @@ import urllib import json import sys from slugify import slugify -from flask import Flask, send_from_directory, render_template, session, redirect, request +from flask import ( + Flask, + send_from_directory, + render_template, + session, + redirect, + request, +) from flask_babel import Babel from flask_babel import gettext as _ from github import Github, InputGitAuthor -from .utils import get_locale, get_catalog, get_wishlist, get_stars, get_app_md_and_screenshots +from .utils import ( + get_locale, + get_catalog, + get_wishlist, + get_stars, + get_app_md_and_screenshots, +) -app = Flask(__name__, static_url_path='/assets', static_folder="assets") +app = Flask(__name__, static_url_path="/assets", static_folder="assets") try: config = toml.loads(open("config.toml").read()) except Exception as e: - print("You should create a config.toml with the appropriate key/values, cf config.toml.example") + print( + "You should create a config.toml with the appropriate key/values, cf config.toml.example" + ) sys.exit(1) mandatory_config_keys = [ @@ -43,14 +58,15 @@ for key in mandatory_config_keys: if config.get("DEBUG"): app.debug = True app.config["DEBUG"] = True - app.config['TEMPLATES_AUTO_RELOAD'] = True + app.config["TEMPLATES_AUTO_RELOAD"] = True # This is the secret key used for session signing app.secret_key = config["COOKIE_SECRET"] babel = Babel(app, locale_selector=get_locale) -@app.template_filter('localize') + +@app.template_filter("localize") def localize(d): if not isinstance(d, dict): return d @@ -61,19 +77,26 @@ def localize(d): else: return d["en"] + ############################################################################### -@app.route('/favicon.ico') + +@app.route("/favicon.ico") def favicon(): - return send_from_directory('assets', 'favicon.png') + return send_from_directory("assets", "favicon.png") -@app.route('/') +@app.route("/") def index(): - return render_template("index.html", locale=get_locale(), user=session.get('user', {}), catalog=get_catalog()) + return render_template( + "index.html", + locale=get_locale(), + user=session.get("user", {}), + catalog=get_catalog(), + ) -@app.route('/catalog') +@app.route("/catalog") def browse_catalog(): return render_template( "catalog.html", @@ -82,14 +105,14 @@ def browse_catalog(): init_search=request.args.get("search"), init_category=request.args.get("category"), init_starsonly=request.args.get("starsonly"), - user=session.get('user', {}), + user=session.get("user", {}), catalog=get_catalog(), timestamp_now=int(time.time()), stars=get_stars(), ) -@app.route('/app/') +@app.route("/app/") def app_info(app_id): infos = get_catalog()["apps"].get(app_id) app_folder = os.path.join(config["APPS_CACHE"], app_id) @@ -98,19 +121,29 @@ def app_info(app_id): get_app_md_and_screenshots(app_folder, infos) - return render_template("app.html", locale=get_locale(), user=session.get('user', {}), app_id=app_id, infos=infos, catalog=get_catalog(), stars=get_stars()) + return render_template( + "app.html", + locale=get_locale(), + user=session.get("user", {}), + app_id=app_id, + infos=infos, + catalog=get_catalog(), + stars=get_stars(), + ) -@app.route('/app//') +@app.route("/app//") def star_app(app_id, action): assert action in ["star", "unstar"] if app_id not in get_catalog()["apps"] and app_id not in get_wishlist(): return _("App %(app_id) not found", app_id=app_id), 404 - if not session.get('user', {}): + if not session.get("user", {}): return _("You must be logged in to be able to star an app"), 401 app_star_folder = os.path.join(".stars", app_id) - app_star_for_this_user = os.path.join(".stars", app_id, session.get('user', {})["id"]) + app_star_for_this_user = os.path.join( + ".stars", app_id, session.get("user", {})["id"] + ) if not os.path.exists(app_star_folder): os.mkdir(app_star_folder) @@ -128,7 +161,8 @@ def star_app(app_id, action): else: return redirect("/wishlist") -@app.route('/wishlist') + +@app.route("/wishlist") def browse_wishlist(): return render_template( "wishlist.html", @@ -136,52 +170,89 @@ def browse_wishlist(): init_search=request.args.get("search"), init_starsonly=request.args.get("starsonly"), locale=get_locale(), - user=session.get('user', {}), + user=session.get("user", {}), wishlist=get_wishlist(), - stars=get_stars() + stars=get_stars(), ) -@app.route('/wishlist/add', methods=['GET', 'POST']) +@app.route("/wishlist/add", methods=["GET", "POST"]) def add_to_wishlist(): if request.method == "POST": - - user = session.get('user', {}) + user = session.get("user", {}) if not user: errormsg = _("You must be logged in to submit an app to the wishlist") - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=None, + errormsg=errormsg, + ) - name = request.form['name'].strip().replace("\n", "") - description = request.form['description'].strip().replace("\n", "") - upstream = request.form['upstream'].strip().replace("\n", "") - website = request.form['website'].strip().replace("\n", "") + name = request.form["name"].strip().replace("\n", "") + description = request.form["description"].strip().replace("\n", "") + upstream = request.form["upstream"].strip().replace("\n", "") + website = request.form["website"].strip().replace("\n", "") checks = [ (len(name) >= 3, _("App name should be at least 3 characters")), (len(name) <= 30, _("App name should be less than 30 characters")), - (len(description) >= 5, _("App description should be at least 5 characters")), - (len(description) <= 100, _("App description should be less than 100 characters")), - (len(upstream) >= 10, _("Upstream code repo URL should be at least 10 characters")), - (len(upstream) <= 150, _("Upstream code repo URL should be less than 150 characters")), + ( + len(description) >= 5, + _("App description should be at least 5 characters"), + ), + ( + len(description) <= 100, + _("App description should be less than 100 characters"), + ), + ( + len(upstream) >= 10, + _("Upstream code repo URL should be at least 10 characters"), + ), + ( + len(upstream) <= 150, + _("Upstream code repo URL should be less than 150 characters"), + ), (len(website) <= 150, _("Website URL should be less than 150 characters")), - (re.match(r"^[\w\.\-\(\)\ ]+$", name), _("App name contains special characters")), + ( + re.match(r"^[\w\.\-\(\)\ ]+$", name), + _("App name contains special characters"), + ), ] for check, errormsg in checks: if not check: - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=None, + errormsg=errormsg, + ) slug = slugify(name) github = Github(config["GITHUB_TOKEN"]) author = InputGitAuthor(config["GITHUB_LOGIN"], config["GITHUB_EMAIL"]) repo = github.get_repo("Yunohost/apps") - current_wishlist_rawtoml = repo.get_contents("wishlist.toml", ref="app-store") # FIXME : ref=repo.default_branch) + current_wishlist_rawtoml = repo.get_contents( + "wishlist.toml", ref="app-store" + ) # FIXME : ref=repo.default_branch) current_wishlist_sha = current_wishlist_rawtoml.sha current_wishlist_rawtoml = current_wishlist_rawtoml.decoded_content.decode() new_wishlist = toml.loads(current_wishlist_rawtoml) if slug in new_wishlist: - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=_("An entry with the name %(slug) already exists in the wishlist", slug=slug)) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=None, + errormsg=_( + "An entry with the name %(slug) already exists in the wishlist", + slug=slug, + ), + ) new_wishlist[slug] = { "name": name, @@ -195,13 +266,23 @@ def add_to_wishlist(): new_branch = f"add-to-wishlist-{slug}" try: # Get the commit base for the new branch, and create it - commit_sha = repo.get_branch("app-store").commit.sha # FIXME app-store -> repo.default_branch + commit_sha = repo.get_branch( + "app-store" + ).commit.sha # FIXME app-store -> repo.default_branch repo.create_git_ref(ref=f"refs/heads/{new_branch}", sha=commit_sha) except exception as e: print("... Failed to create branch ?") print(e) - errormsg = _("Failed to create the pull request to add the app to the wishlist ... please report the issue to the yunohost team") - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg) + errormsg = _( + "Failed to create the pull request to add the app to the wishlist ... please report the issue to the yunohost team" + ) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=None, + errormsg=errormsg, + ) message = f"Add {name} to wishlist" repo.update_file( @@ -228,28 +309,50 @@ Proposed by **{session['user']['username']}** # Open the PR pr = repo.create_pull( - title=message, body=body, head=new_branch, base="app-store" # FIXME app-store -> repo.default_branch + title=message, + body=body, + head=new_branch, + base="app-store", # FIXME app-store -> repo.default_branch ) url = f"https://github.com/YunoHost/apps/pull/{pr.number}" - successmsg = _("Your proposed app has succesfully been submitted. It must now be validated by the YunoHost team. You can track progress here: %(url)s", url=url) - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=successmsg) + successmsg = _( + "Your proposed app has succesfully been submitted. It must now be validated by the YunoHost team. You can track progress here: %(url)s", + url=url, + ) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=successmsg, + ) else: - return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=None) + return render_template( + "wishlist_add.html", + locale=get_locale(), + user=session.get("user", {}), + successmsg=None, + errormsg=None, + ) ############################################################################### # Session / SSO using Discourse # ############################################################################### -@app.route('/login_using_discourse') + +@app.route("/login_using_discourse") def login_using_discourse(): """ Send auth request to Discourse: """ - nonce, url, uri_to_redirect_to_after_login = create_nonce_and_build_url_to_login_on_discourse_sso() + ( + nonce, + url, + uri_to_redirect_to_after_login, + ) = create_nonce_and_build_url_to_login_on_discourse_sso() session.clear() session["nonce"] = nonce @@ -259,17 +362,17 @@ def login_using_discourse(): return redirect(url) -@app.route('/sso_login_callback') +@app.route("/sso_login_callback") def sso_login_callback(): - response = base64.b64decode(request.args['sso'].encode()).decode() + response = base64.b64decode(request.args["sso"].encode()).decode() user_data = urllib.parse.parse_qs(response) - if user_data['nonce'][0] != session.get("nonce"): + if user_data["nonce"][0] != session.get("nonce"): return "Invalid nonce", 401 uri_to_redirect_to_after_login = session.get("uri_to_redirect_to_after_login") session.clear() - session['user'] = { + session["user"] = { "id": user_data["external_id"][0], "username": user_data["username"][0], "avatar_url": user_data["avatar_url"][0], @@ -281,7 +384,7 @@ def sso_login_callback(): return redirect("/") -@app.route('/logout') +@app.route("/logout") def logout(): session.clear() @@ -290,9 +393,9 @@ def logout(): referer = request.environ.get("HTTP_REFERER") if referer: if referer.startswith("http://"): - referer = referer[len("http://"):] + referer = referer[len("http://") :] if referer.startswith("https://"): - referer = referer[len("https://"):] + referer = referer[len("https://") :] if "/" not in referer: referer = referer + "/" @@ -308,7 +411,7 @@ def create_nonce_and_build_url_to_login_on_discourse_sso(): Redirect the user to DISCOURSE_ROOT_URL/session/sso_provider?sso=URL_ENCODED_PAYLOAD&sig=HEX_SIGNATURE """ - nonce = ''.join([str(random.randint(0, 9)) for i in range(99)]) + nonce = "".join([str(random.randint(0, 9)) for i in range(99)]) # Only use the current referer URI if it's on the same domain as the current route # to avoid XSS or whatever... @@ -316,9 +419,9 @@ def create_nonce_and_build_url_to_login_on_discourse_sso(): uri_to_redirect_to_after_login = None if referer: if referer.startswith("http://"): - referer = referer[len("http://"):] + referer = referer[len("http://") :] if referer.startswith("https://"): - referer = referer[len("https://"):] + referer = referer[len("https://") :] if "/" not in referer: referer = referer + "/" @@ -326,10 +429,17 @@ def create_nonce_and_build_url_to_login_on_discourse_sso(): if domain == request.environ.get("HTTP_HOST"): uri_to_redirect_to_after_login = uri - url_data = {"nonce": nonce, "return_sso_url": config["CALLBACK_URL_AFTER_LOGIN_ON_DISCOURSE"]} + url_data = { + "nonce": nonce, + "return_sso_url": config["CALLBACK_URL_AFTER_LOGIN_ON_DISCOURSE"], + } url_encoded = urllib.parse.urlencode(url_data) payload = base64.b64encode(url_encoded.encode()).decode() - sig = hmac.new(config["DISCOURSE_SSO_SECRET"].encode(), msg=payload.encode(), digestmod=hashlib.sha256).hexdigest() + sig = hmac.new( + config["DISCOURSE_SSO_SECRET"].encode(), + msg=payload.encode(), + digestmod=hashlib.sha256, + ).hexdigest() data = {"sig": sig, "sso": payload} url = f"{config['DISCOURSE_SSO_ENDPOINT']}?{urllib.parse.urlencode(data)}" diff --git a/store/utils.py b/store/utils.py index b086b8ae..6c341211 100644 --- a/store/utils.py +++ b/store/utils.py @@ -9,22 +9,23 @@ from flask import request AVAILABLE_LANGUAGES = ["en"] + os.listdir("translations") + + def get_locale(): # try to guess the language from the user accept # The best match wins. return request.accept_languages.best_match(AVAILABLE_LANGUAGES) -def get_catalog(): +def get_catalog(): path = "../builds/default/v3/apps.json" mtime = os.path.getmtime(path) if get_catalog.mtime_catalog != mtime: - get_catalog.mtime_catalog = mtime catalog = json.load(open(path)) - catalog['categories'] = {c['id']:c for c in catalog['categories']} - catalog['antifeatures'] = {c['id']:c for c in catalog['antifeatures']} + catalog["categories"] = {c["id"]: c for c in catalog["categories"]} + catalog["antifeatures"] = {c["id"]: c for c in catalog["antifeatures"]} category_color = { "synchronization": "sky", @@ -43,35 +44,38 @@ def get_catalog(): "wat": "teal", } - for id_, category in catalog['categories'].items(): + for id_, category in catalog["categories"].items(): category["color"] = category_color[id_] get_catalog.cache_catalog = catalog return get_catalog.cache_catalog + get_catalog.mtime_catalog = None get_catalog() def get_wishlist(): - path = "../wishlist.toml" mtime = os.path.getmtime(path) if get_wishlist.mtime_wishlist != mtime: - get_wishlist.mtime_wishlist = mtime get_wishlist.cache_wishlist = toml.load(open(path)) return get_wishlist.cache_wishlist + get_wishlist.mtime_wishlist = None get_wishlist() def get_stars(): - - checksum = subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True).decode().split()[0] + checksum = ( + subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True) + .decode() + .split()[0] + ) if get_stars.cache_checksum != checksum: stars = {} for folder, _, files in os.walk(".stars/"): @@ -84,6 +88,7 @@ def get_stars(): return get_stars.cache_stars + get_stars.cache_checksum = None get_stars() @@ -98,9 +103,7 @@ def human_to_binary(size: str) -> int: size = size[:-1] if suffix not in symbols: - raise Exception( - f"Invalid size suffix '{suffix}', expected one of {symbols}" - ) + raise Exception(f"Invalid size suffix '{suffix}', expected one of {symbols}") try: size_ = float(size) @@ -111,10 +114,11 @@ def human_to_binary(size: str) -> int: def get_app_md_and_screenshots(app_folder, infos): - locale = get_locale() - if locale != "en" and os.path.exists(os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")): + if locale != "en" and os.path.exists( + os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md") + ): description_path = os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md") elif os.path.exists(os.path.join(app_folder, "doc", "DESCRIPTION.md")): description_path = os.path.join(app_folder, "doc", "DESCRIPTION.md") @@ -122,11 +126,15 @@ def get_app_md_and_screenshots(app_folder, infos): description_path = None if description_path: with open(description_path) as f: - infos["full_description_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias") + infos["full_description_html"] = emojize( + pycmarkgfm.gfm_to_html(f.read()), language="alias" + ) else: - infos["full_description_html"] = infos['manifest']['description'][locale] + infos["full_description_html"] = infos["manifest"]["description"][locale] - if locale != "en" and os.path.exists(os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")): + if locale != "en" and os.path.exists( + os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md") + ): pre_install_path = os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md") elif os.path.exists(os.path.join(app_folder, "doc", "PRE_INSTALL.md")): pre_install_path = os.path.join(app_folder, "doc", "PRE_INSTALL.md") @@ -134,7 +142,9 @@ def get_app_md_and_screenshots(app_folder, infos): pre_install_path = None if pre_install_path: with open(pre_install_path) as f: - infos["pre_install_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias") + infos["pre_install_html"] = emojize( + pycmarkgfm.gfm_to_html(f.read()), language="alias" + ) infos["screenshot"] = None @@ -153,4 +163,6 @@ def get_app_md_and_screenshots(app_folder, infos): break ram_build_requirement = infos["manifest"]["integration"]["ram"]["build"] - infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary(ram_build_requirement) + infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary( + ram_build_requirement + )