1
0
Fork 0
mirror of https://github.com/YunoHost/apps.git synced 2024-09-03 20:06:07 +02:00

appstore: black app.py and utils.py

This commit is contained in:
Alexandre Aubin 2023-09-19 01:15:01 +02:00
parent fd6f0eb24c
commit 803f379c81
2 changed files with 198 additions and 76 deletions

View file

@ -10,18 +10,33 @@ import urllib
import json
import sys
from slugify import slugify
from flask import Flask, send_from_directory, render_template, session, redirect, request
from flask import (
Flask,
send_from_directory,
render_template,
session,
redirect,
request,
)
from flask_babel import Babel
from flask_babel import gettext as _
from github import Github, InputGitAuthor
from .utils import get_locale, get_catalog, get_wishlist, get_stars, get_app_md_and_screenshots
from .utils import (
get_locale,
get_catalog,
get_wishlist,
get_stars,
get_app_md_and_screenshots,
)
app = Flask(__name__, static_url_path='/assets', static_folder="assets")
app = Flask(__name__, static_url_path="/assets", static_folder="assets")
try:
config = toml.loads(open("config.toml").read())
except Exception as e:
print("You should create a config.toml with the appropriate key/values, cf config.toml.example")
print(
"You should create a config.toml with the appropriate key/values, cf config.toml.example"
)
sys.exit(1)
mandatory_config_keys = [
@ -43,14 +58,15 @@ for key in mandatory_config_keys:
if config.get("DEBUG"):
app.debug = True
app.config["DEBUG"] = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["TEMPLATES_AUTO_RELOAD"] = True
# This is the secret key used for session signing
app.secret_key = config["COOKIE_SECRET"]
babel = Babel(app, locale_selector=get_locale)
@app.template_filter('localize')
@app.template_filter("localize")
def localize(d):
if not isinstance(d, dict):
return d
@ -61,19 +77,26 @@ def localize(d):
else:
return d["en"]
###############################################################################
@app.route('/favicon.ico')
@app.route("/favicon.ico")
def favicon():
return send_from_directory('assets', 'favicon.png')
return send_from_directory("assets", "favicon.png")
@app.route('/')
@app.route("/")
def index():
return render_template("index.html", locale=get_locale(), user=session.get('user', {}), catalog=get_catalog())
return render_template(
"index.html",
locale=get_locale(),
user=session.get("user", {}),
catalog=get_catalog(),
)
@app.route('/catalog')
@app.route("/catalog")
def browse_catalog():
return render_template(
"catalog.html",
@ -82,14 +105,14 @@ def browse_catalog():
init_search=request.args.get("search"),
init_category=request.args.get("category"),
init_starsonly=request.args.get("starsonly"),
user=session.get('user', {}),
user=session.get("user", {}),
catalog=get_catalog(),
timestamp_now=int(time.time()),
stars=get_stars(),
)
@app.route('/app/<app_id>')
@app.route("/app/<app_id>")
def app_info(app_id):
infos = get_catalog()["apps"].get(app_id)
app_folder = os.path.join(config["APPS_CACHE"], app_id)
@ -98,19 +121,29 @@ def app_info(app_id):
get_app_md_and_screenshots(app_folder, infos)
return render_template("app.html", locale=get_locale(), user=session.get('user', {}), app_id=app_id, infos=infos, catalog=get_catalog(), stars=get_stars())
return render_template(
"app.html",
locale=get_locale(),
user=session.get("user", {}),
app_id=app_id,
infos=infos,
catalog=get_catalog(),
stars=get_stars(),
)
@app.route('/app/<app_id>/<action>')
@app.route("/app/<app_id>/<action>")
def star_app(app_id, action):
assert action in ["star", "unstar"]
if app_id not in get_catalog()["apps"] and app_id not in get_wishlist():
return _("App %(app_id) not found", app_id=app_id), 404
if not session.get('user', {}):
if not session.get("user", {}):
return _("You must be logged in to be able to star an app"), 401
app_star_folder = os.path.join(".stars", app_id)
app_star_for_this_user = os.path.join(".stars", app_id, session.get('user', {})["id"])
app_star_for_this_user = os.path.join(
".stars", app_id, session.get("user", {})["id"]
)
if not os.path.exists(app_star_folder):
os.mkdir(app_star_folder)
@ -128,7 +161,8 @@ def star_app(app_id, action):
else:
return redirect("/wishlist")
@app.route('/wishlist')
@app.route("/wishlist")
def browse_wishlist():
return render_template(
"wishlist.html",
@ -136,52 +170,89 @@ def browse_wishlist():
init_search=request.args.get("search"),
init_starsonly=request.args.get("starsonly"),
locale=get_locale(),
user=session.get('user', {}),
user=session.get("user", {}),
wishlist=get_wishlist(),
stars=get_stars()
stars=get_stars(),
)
@app.route('/wishlist/add', methods=['GET', 'POST'])
@app.route("/wishlist/add", methods=["GET", "POST"])
def add_to_wishlist():
if request.method == "POST":
user = session.get('user', {})
user = session.get("user", {})
if not user:
errormsg = _("You must be logged in to submit an app to the wishlist")
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg)
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=None,
errormsg=errormsg,
)
name = request.form['name'].strip().replace("\n", "")
description = request.form['description'].strip().replace("\n", "")
upstream = request.form['upstream'].strip().replace("\n", "")
website = request.form['website'].strip().replace("\n", "")
name = request.form["name"].strip().replace("\n", "")
description = request.form["description"].strip().replace("\n", "")
upstream = request.form["upstream"].strip().replace("\n", "")
website = request.form["website"].strip().replace("\n", "")
checks = [
(len(name) >= 3, _("App name should be at least 3 characters")),
(len(name) <= 30, _("App name should be less than 30 characters")),
(len(description) >= 5, _("App description should be at least 5 characters")),
(len(description) <= 100, _("App description should be less than 100 characters")),
(len(upstream) >= 10, _("Upstream code repo URL should be at least 10 characters")),
(len(upstream) <= 150, _("Upstream code repo URL should be less than 150 characters")),
(
len(description) >= 5,
_("App description should be at least 5 characters"),
),
(
len(description) <= 100,
_("App description should be less than 100 characters"),
),
(
len(upstream) >= 10,
_("Upstream code repo URL should be at least 10 characters"),
),
(
len(upstream) <= 150,
_("Upstream code repo URL should be less than 150 characters"),
),
(len(website) <= 150, _("Website URL should be less than 150 characters")),
(re.match(r"^[\w\.\-\(\)\ ]+$", name), _("App name contains special characters")),
(
re.match(r"^[\w\.\-\(\)\ ]+$", name),
_("App name contains special characters"),
),
]
for check, errormsg in checks:
if not check:
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg)
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=None,
errormsg=errormsg,
)
slug = slugify(name)
github = Github(config["GITHUB_TOKEN"])
author = InputGitAuthor(config["GITHUB_LOGIN"], config["GITHUB_EMAIL"])
repo = github.get_repo("Yunohost/apps")
current_wishlist_rawtoml = repo.get_contents("wishlist.toml", ref="app-store") # FIXME : ref=repo.default_branch)
current_wishlist_rawtoml = repo.get_contents(
"wishlist.toml", ref="app-store"
) # FIXME : ref=repo.default_branch)
current_wishlist_sha = current_wishlist_rawtoml.sha
current_wishlist_rawtoml = current_wishlist_rawtoml.decoded_content.decode()
new_wishlist = toml.loads(current_wishlist_rawtoml)
if slug in new_wishlist:
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=_("An entry with the name %(slug) already exists in the wishlist", slug=slug))
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=None,
errormsg=_(
"An entry with the name %(slug) already exists in the wishlist",
slug=slug,
),
)
new_wishlist[slug] = {
"name": name,
@ -195,13 +266,23 @@ def add_to_wishlist():
new_branch = f"add-to-wishlist-{slug}"
try:
# Get the commit base for the new branch, and create it
commit_sha = repo.get_branch("app-store").commit.sha # FIXME app-store -> repo.default_branch
commit_sha = repo.get_branch(
"app-store"
).commit.sha # FIXME app-store -> repo.default_branch
repo.create_git_ref(ref=f"refs/heads/{new_branch}", sha=commit_sha)
except exception as e:
print("... Failed to create branch ?")
print(e)
errormsg = _("Failed to create the pull request to add the app to the wishlist ... please report the issue to the yunohost team")
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=errormsg)
errormsg = _(
"Failed to create the pull request to add the app to the wishlist ... please report the issue to the yunohost team"
)
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=None,
errormsg=errormsg,
)
message = f"Add {name} to wishlist"
repo.update_file(
@ -228,28 +309,50 @@ Proposed by **{session['user']['username']}**
# Open the PR
pr = repo.create_pull(
title=message, body=body, head=new_branch, base="app-store" # FIXME app-store -> repo.default_branch
title=message,
body=body,
head=new_branch,
base="app-store", # FIXME app-store -> repo.default_branch
)
url = f"https://github.com/YunoHost/apps/pull/{pr.number}"
successmsg = _("Your proposed app has succesfully been submitted. It must now be validated by the YunoHost team. You can track progress here: %(url)s", url=url)
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=successmsg)
successmsg = _(
"Your proposed app has succesfully been submitted. It must now be validated by the YunoHost team. You can track progress here: %(url)s",
url=url,
)
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=successmsg,
)
else:
return render_template("wishlist_add.html", locale=get_locale(), user=session.get('user', {}), successmsg=None, errormsg=None)
return render_template(
"wishlist_add.html",
locale=get_locale(),
user=session.get("user", {}),
successmsg=None,
errormsg=None,
)
###############################################################################
# Session / SSO using Discourse #
###############################################################################
@app.route('/login_using_discourse')
@app.route("/login_using_discourse")
def login_using_discourse():
"""
Send auth request to Discourse:
"""
nonce, url, uri_to_redirect_to_after_login = create_nonce_and_build_url_to_login_on_discourse_sso()
(
nonce,
url,
uri_to_redirect_to_after_login,
) = create_nonce_and_build_url_to_login_on_discourse_sso()
session.clear()
session["nonce"] = nonce
@ -259,17 +362,17 @@ def login_using_discourse():
return redirect(url)
@app.route('/sso_login_callback')
@app.route("/sso_login_callback")
def sso_login_callback():
response = base64.b64decode(request.args['sso'].encode()).decode()
response = base64.b64decode(request.args["sso"].encode()).decode()
user_data = urllib.parse.parse_qs(response)
if user_data['nonce'][0] != session.get("nonce"):
if user_data["nonce"][0] != session.get("nonce"):
return "Invalid nonce", 401
uri_to_redirect_to_after_login = session.get("uri_to_redirect_to_after_login")
session.clear()
session['user'] = {
session["user"] = {
"id": user_data["external_id"][0],
"username": user_data["username"][0],
"avatar_url": user_data["avatar_url"][0],
@ -281,7 +384,7 @@ def sso_login_callback():
return redirect("/")
@app.route('/logout')
@app.route("/logout")
def logout():
session.clear()
@ -290,9 +393,9 @@ def logout():
referer = request.environ.get("HTTP_REFERER")
if referer:
if referer.startswith("http://"):
referer = referer[len("http://"):]
referer = referer[len("http://") :]
if referer.startswith("https://"):
referer = referer[len("https://"):]
referer = referer[len("https://") :]
if "/" not in referer:
referer = referer + "/"
@ -308,7 +411,7 @@ def create_nonce_and_build_url_to_login_on_discourse_sso():
Redirect the user to DISCOURSE_ROOT_URL/session/sso_provider?sso=URL_ENCODED_PAYLOAD&sig=HEX_SIGNATURE
"""
nonce = ''.join([str(random.randint(0, 9)) for i in range(99)])
nonce = "".join([str(random.randint(0, 9)) for i in range(99)])
# Only use the current referer URI if it's on the same domain as the current route
# to avoid XSS or whatever...
@ -316,9 +419,9 @@ def create_nonce_and_build_url_to_login_on_discourse_sso():
uri_to_redirect_to_after_login = None
if referer:
if referer.startswith("http://"):
referer = referer[len("http://"):]
referer = referer[len("http://") :]
if referer.startswith("https://"):
referer = referer[len("https://"):]
referer = referer[len("https://") :]
if "/" not in referer:
referer = referer + "/"
@ -326,10 +429,17 @@ def create_nonce_and_build_url_to_login_on_discourse_sso():
if domain == request.environ.get("HTTP_HOST"):
uri_to_redirect_to_after_login = uri
url_data = {"nonce": nonce, "return_sso_url": config["CALLBACK_URL_AFTER_LOGIN_ON_DISCOURSE"]}
url_data = {
"nonce": nonce,
"return_sso_url": config["CALLBACK_URL_AFTER_LOGIN_ON_DISCOURSE"],
}
url_encoded = urllib.parse.urlencode(url_data)
payload = base64.b64encode(url_encoded.encode()).decode()
sig = hmac.new(config["DISCOURSE_SSO_SECRET"].encode(), msg=payload.encode(), digestmod=hashlib.sha256).hexdigest()
sig = hmac.new(
config["DISCOURSE_SSO_SECRET"].encode(),
msg=payload.encode(),
digestmod=hashlib.sha256,
).hexdigest()
data = {"sig": sig, "sso": payload}
url = f"{config['DISCOURSE_SSO_ENDPOINT']}?{urllib.parse.urlencode(data)}"

View file

@ -9,22 +9,23 @@ from flask import request
AVAILABLE_LANGUAGES = ["en"] + os.listdir("translations")
def get_locale():
# try to guess the language from the user accept
# The best match wins.
return request.accept_languages.best_match(AVAILABLE_LANGUAGES)
def get_catalog():
def get_catalog():
path = "../builds/default/v3/apps.json"
mtime = os.path.getmtime(path)
if get_catalog.mtime_catalog != mtime:
get_catalog.mtime_catalog = mtime
catalog = json.load(open(path))
catalog['categories'] = {c['id']:c for c in catalog['categories']}
catalog['antifeatures'] = {c['id']:c for c in catalog['antifeatures']}
catalog["categories"] = {c["id"]: c for c in catalog["categories"]}
catalog["antifeatures"] = {c["id"]: c for c in catalog["antifeatures"]}
category_color = {
"synchronization": "sky",
@ -43,35 +44,38 @@ def get_catalog():
"wat": "teal",
}
for id_, category in catalog['categories'].items():
for id_, category in catalog["categories"].items():
category["color"] = category_color[id_]
get_catalog.cache_catalog = catalog
return get_catalog.cache_catalog
get_catalog.mtime_catalog = None
get_catalog()
def get_wishlist():
path = "../wishlist.toml"
mtime = os.path.getmtime(path)
if get_wishlist.mtime_wishlist != mtime:
get_wishlist.mtime_wishlist = mtime
get_wishlist.cache_wishlist = toml.load(open(path))
return get_wishlist.cache_wishlist
get_wishlist.mtime_wishlist = None
get_wishlist()
def get_stars():
checksum = subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True).decode().split()[0]
checksum = (
subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True)
.decode()
.split()[0]
)
if get_stars.cache_checksum != checksum:
stars = {}
for folder, _, files in os.walk(".stars/"):
@ -84,6 +88,7 @@ def get_stars():
return get_stars.cache_stars
get_stars.cache_checksum = None
get_stars()
@ -98,9 +103,7 @@ def human_to_binary(size: str) -> int:
size = size[:-1]
if suffix not in symbols:
raise Exception(
f"Invalid size suffix '{suffix}', expected one of {symbols}"
)
raise Exception(f"Invalid size suffix '{suffix}', expected one of {symbols}")
try:
size_ = float(size)
@ -111,10 +114,11 @@ def human_to_binary(size: str) -> int:
def get_app_md_and_screenshots(app_folder, infos):
locale = get_locale()
if locale != "en" and os.path.exists(os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")):
if locale != "en" and os.path.exists(
os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")
):
description_path = os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "DESCRIPTION.md")):
description_path = os.path.join(app_folder, "doc", "DESCRIPTION.md")
@ -122,11 +126,15 @@ def get_app_md_and_screenshots(app_folder, infos):
description_path = None
if description_path:
with open(description_path) as f:
infos["full_description_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
infos["full_description_html"] = emojize(
pycmarkgfm.gfm_to_html(f.read()), language="alias"
)
else:
infos["full_description_html"] = infos['manifest']['description'][locale]
infos["full_description_html"] = infos["manifest"]["description"][locale]
if locale != "en" and os.path.exists(os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")):
if locale != "en" and os.path.exists(
os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")
):
pre_install_path = os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "PRE_INSTALL.md")):
pre_install_path = os.path.join(app_folder, "doc", "PRE_INSTALL.md")
@ -134,7 +142,9 @@ def get_app_md_and_screenshots(app_folder, infos):
pre_install_path = None
if pre_install_path:
with open(pre_install_path) as f:
infos["pre_install_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
infos["pre_install_html"] = emojize(
pycmarkgfm.gfm_to_html(f.read()), language="alias"
)
infos["screenshot"] = None
@ -153,4 +163,6 @@ def get_app_md_and_screenshots(app_folder, infos):
break
ram_build_requirement = infos["manifest"]["integration"]["ram"]["build"]
infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary(ram_build_requirement)
infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary(
ram_build_requirement
)