1
0
Fork 0
mirror of https://github.com/YunoHost/apps.git synced 2024-09-03 20:06:07 +02:00

appstore: small refactoring, proper cache/refresh mechanism for catalog and wishlist

This commit is contained in:
Alexandre Aubin 2023-09-03 12:11:56 +02:00
parent 37330d3d07
commit 56e6f43e41
5 changed files with 213 additions and 157 deletions

0
store/.stars/.gitkeep Normal file
View file

0
store/__init__.py Normal file
View file

View file

@ -1,5 +1,3 @@
import subprocess
import pycmarkgfm
import time
import re
import toml
@ -14,13 +12,9 @@ import sys
from slugify import slugify
from flask import Flask, send_from_directory, render_template, session, redirect, request
from github import Github, InputGitAuthor
from emoji import emojize
from .utils import get_catalog, get_wishlist, get_stars, get_app_md_and_screenshots
locale = "en"
app = Flask(__name__, static_url_path='/assets', static_folder="assets")
catalog = json.load(open("../builds/default/v3/apps.json"))
catalog['categories'] = {c['id']:c for c in catalog['categories']}
catalog['antifeatures'] = {c['id']:c for c in catalog['antifeatures']}
try:
config = toml.loads(open("config.toml").read())
@ -37,7 +31,6 @@ mandatory_config_keys = [
"GITHUB_TOKEN",
"GITHUB_EMAIL",
"APPS_CACHE",
"STARS_DB_FOLDER",
]
for key in mandatory_config_keys:
@ -50,184 +43,58 @@ if config.get("DEBUG"):
app.config["DEBUG"] = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
category_color = {
"synchronization": "sky",
"publishing": "yellow",
"communication": "amber",
"office": "lime",
"productivity_and_management": "purple",
"small_utilities": "",
"reading": "emerald",
"multimedia": "fuchsia",
"social_media": "rose",
"games": "violet",
"dev": "stone",
"system_tools": "white",
"iot": "orange",
"wat": "teal",
}
for id_, category in catalog['categories'].items():
category["color"] = category_color[id_]
wishlist = toml.load(open("../wishlist.toml"))
# This is the secret key used for session signing
app.secret_key = config["COOKIE_SECRET"]
def get_stars():
checksum = subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True).decode().split()[0]
if get_stars.cache_checksum != checksum:
stars = {}
for folder, _, files in os.walk(config["STARS_DB_FOLDER"]):
app_id = folder.split("/")[-1]
if not app_id:
continue
stars[app_id] = set(files)
get_stars.cache_stars = stars
get_stars.cache_checksum = checksum
return get_stars.cache_stars
get_stars.cache_checksum = None
get_stars()
def human_to_binary(size: str) -> int:
symbols = ("K", "M", "G", "T", "P", "E", "Z", "Y")
factor = {}
for i, s in enumerate(symbols):
factor[s] = 1 << (i + 1) * 10
suffix = size[-1]
size = size[:-1]
if suffix not in symbols:
raise YunohostError(
f"Invalid size suffix '{suffix}', expected one of {symbols}"
)
try:
size_ = float(size)
except Exception:
raise YunohostError(f"Failed to convert size {size} to float")
return int(size_ * factor[suffix])
###############################################################################
@app.route('/favicon.ico')
def favicon():
return send_from_directory('assets', 'favicon.png')
@app.route('/login_using_discourse')
def login_using_discourse():
"""
Send auth request to Discourse:
"""
nonce, url = create_nonce_and_build_url_to_login_on_discourse_sso()
session.clear()
session["nonce"] = nonce
print(f"DEBUG: none = {nonce}")
return redirect(url)
@app.route('/sso_login_callback')
def sso_login_callback():
response = base64.b64decode(request.args['sso'].encode()).decode()
user_data = urllib.parse.parse_qs(response)
print("DEBUG: nonce from url args " + user_data['nonce'][0])
print("DEBUG: nonce from session args " + session.get("nonce"))
if user_data['nonce'][0] != session.get("nonce"):
return "Invalid nonce", 401
else:
session.clear()
session['user'] = {
"id": user_data["external_id"][0],
"username": user_data["username"][0],
"avatar_url": user_data["avatar_url"][0],
}
return redirect("/")
@app.route('/logout')
def logout():
session.clear()
return redirect("/")
@app.route('/')
def index():
return render_template("index.html", user=session.get('user', {}), catalog=catalog)
return render_template("index.html", user=session.get('user', {}), catalog=get_catalog())
@app.route('/catalog')
def browse_catalog():
return render_template("catalog.html", init_sort=request.args.get("sort"), init_search=request.args.get("search"), init_category=request.args.get("category"), init_starsonly=request.args.get("starsonly"), user=session.get('user', {}), catalog=catalog, timestamp_now=int(time.time()), stars=get_stars())
return render_template(
"catalog.html",
init_sort=request.args.get("sort"),
init_search=request.args.get("search"),
init_category=request.args.get("category"),
init_starsonly=request.args.get("starsonly"),
user=session.get('user', {}),
catalog=get_catalog(),
timestamp_now=int(time.time()),
stars=get_stars()
)
@app.route('/app/<app_id>')
def app_info(app_id):
infos = catalog["apps"].get(app_id)
infos = get_catalog()["apps"].get(app_id)
app_folder = os.path.join(config["APPS_CACHE"], app_id)
if not infos or not os.path.exists(app_folder):
return f"App {app_id} not found", 404
if os.path.exists(os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")):
description_path = os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "DESCRIPTION.md")):
description_path = os.path.join(app_folder, "doc", "DESCRIPTION.md")
else:
description_path = None
if description_path:
with open(description_path) as f:
infos["full_description_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
else:
infos["full_description_html"] = infos['manifest']['description'][locale]
get_app_md_and_screenshots(app_folder, infos)
if os.path.exists(os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")):
pre_install_path = os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "PRE_INSTALL.md")):
pre_install_path = os.path.join(app_folder, "doc", "PRE_INSTALL.md")
else:
pre_install_path = None
if pre_install_path:
with open(pre_install_path) as f:
infos["pre_install_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
infos["screenshot"] = None
screenshots_folder = os.path.join(app_folder, "doc", "screenshots")
if os.path.exists(screenshots_folder):
with os.scandir(screenshots_folder) as it:
for entry in it:
ext = os.path.splitext(entry.name)[1].replace(".", "").lower()
if entry.is_file() and ext in ("png", "jpg", "jpeg", "webp", "gif"):
with open(entry.path, "rb") as img_file:
data = base64.b64encode(img_file.read()).decode("utf-8")
infos[
"screenshot"
] = f"data:image/{ext};charset=utf-8;base64,{data}"
break
ram_build_requirement = infos["manifest"]["integration"]["ram"]["build"]
infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary(ram_build_requirement)
return render_template("app.html", user=session.get('user', {}), app_id=app_id, infos=infos, catalog=catalog, stars=get_stars())
return render_template("app.html", user=session.get('user', {}), app_id=app_id, infos=infos, catalog=get_catalog(), stars=get_stars())
@app.route('/app/<app_id>/<action>')
def star_app(app_id, action):
assert action in ["star", "unstar"]
if app_id not in catalog["apps"] and app_id not in wishlist:
if app_id not in get_catalog()["apps"] and app_id not in get_wishlist():
return f"App {app_id} not found", 404
if not session.get('user', {}):
return f"You must be logged in to be able to star an app", 401
app_star_folder = os.path.join(config["STARS_DB_FOLDER"], app_id)
app_star_for_this_user = os.path.join(config["STARS_DB_FOLDER"], app_id, session.get('user', {})["id"])
app_star_folder = os.path.join(".stars", app_id)
app_star_for_this_user = os.path.join(".stars", app_id, session.get('user', {})["id"])
if not os.path.exists(app_star_folder):
os.mkdir(app_star_folder)
@ -240,14 +107,14 @@ def star_app(app_id, action):
except FileNotFoundError:
pass
if app_id in catalog["apps"]:
if app_id in get_catalog()["apps"]:
return redirect(f"/app/{app_id}")
else:
return redirect("/wishlist")
@app.route('/wishlist')
def browse_wishlist():
return render_template("wishlist.html", user=session.get('user', {}), wishlist=wishlist, stars=get_stars())
return render_template("wishlist.html", user=session.get('user', {}), wishlist=get_wishlist(), stars=get_stars())
@app.route('/wishlist/add', methods=['GET', 'POST'])
@ -345,7 +212,49 @@ Proposed by **{session['user']['username']}**
return render_template("wishlist_add.html", user=session.get('user', {}), successmsg=None, errormsg=None)
################################################
###############################################################################
# Session / SSO using Discourse #
###############################################################################
@app.route('/login_using_discourse')
def login_using_discourse():
"""
Send auth request to Discourse:
"""
nonce, url = create_nonce_and_build_url_to_login_on_discourse_sso()
session.clear()
session["nonce"] = nonce
print(f"DEBUG: nonce = {nonce}")
print(f"DEBUG: nonce2 = {session['nonce']}")
return redirect(url)
@app.route('/sso_login_callback')
def sso_login_callback():
response = base64.b64decode(request.args['sso'].encode()).decode()
user_data = urllib.parse.parse_qs(response)
print("DEBUG: nonce from url args: " + user_data['nonce'][0])
print("DEBUG: nonce from session args: " + session.get("nonce", ""))
if user_data['nonce'][0] != session.get("nonce"):
return "Invalid nonce", 401
else:
session.clear()
session['user'] = {
"id": user_data["external_id"][0],
"username": user_data["username"][0],
"avatar_url": user_data["avatar_url"][0],
}
return redirect("/")
@app.route('/logout')
def logout():
session.clear()
return redirect("/")
def create_nonce_and_build_url_to_login_on_discourse_sso():
"""

View file

@ -7,4 +7,3 @@ GITHUB_LOGIN = "yunohost-bot"
GITHUB_EMAIL = "yunohost [at] yunohost.org" # Replace the [at] by actual @
GITHUB_TOKEN = "superSecretToken"
APPS_CACHE = "../.apps_cache/"
STARS_DB_FOLDER = ".stars/"

148
store/utils.py Normal file
View file

@ -0,0 +1,148 @@
import base64
import os
import json
import toml
import subprocess
import pycmarkgfm
from emoji import emojize
def get_catalog():
path = "../builds/default/v3/apps.json"
mtime = os.path.getmtime(path)
if get_catalog.mtime_catalog != mtime:
get_catalog.mtime_catalog = mtime
catalog = json.load(open(path))
catalog['categories'] = {c['id']:c for c in catalog['categories']}
catalog['antifeatures'] = {c['id']:c for c in catalog['antifeatures']}
category_color = {
"synchronization": "sky",
"publishing": "yellow",
"communication": "amber",
"office": "lime",
"productivity_and_management": "purple",
"small_utilities": "",
"reading": "emerald",
"multimedia": "fuchsia",
"social_media": "rose",
"games": "violet",
"dev": "stone",
"system_tools": "white",
"iot": "orange",
"wat": "teal",
}
for id_, category in catalog['categories'].items():
category["color"] = category_color[id_]
get_catalog.cache_catalog = catalog
return get_catalog.cache_catalog
get_catalog.mtime_catalog = None
get_catalog()
def get_wishlist():
path = "../wishlist.toml"
mtime = os.path.getmtime(path)
if get_wishlist.mtime_wishlist != mtime:
get_wishlist.mtime_wishlist = mtime
get_wishlist.cache_wishlist = toml.load(open(path))
return get_wishlist.cache_wishlist
get_wishlist.mtime_wishlist = None
get_wishlist()
def get_stars():
checksum = subprocess.check_output("find . -type f -printf '%T@,' | md5sum", shell=True).decode().split()[0]
if get_stars.cache_checksum != checksum:
stars = {}
for folder, _, files in os.walk(".stars/"):
app_id = folder.split("/")[-1]
if not app_id:
continue
stars[app_id] = set(files)
get_stars.cache_stars = stars
get_stars.cache_checksum = checksum
return get_stars.cache_stars
get_stars.cache_checksum = None
get_stars()
def human_to_binary(size: str) -> int:
symbols = ("K", "M", "G", "T", "P", "E", "Z", "Y")
factor = {}
for i, s in enumerate(symbols):
factor[s] = 1 << (i + 1) * 10
suffix = size[-1]
size = size[:-1]
if suffix not in symbols:
raise Exception(
f"Invalid size suffix '{suffix}', expected one of {symbols}"
)
try:
size_ = float(size)
except Exception:
raise Exception(f"Failed to convert size {size} to float")
return int(size_ * factor[suffix])
def get_app_md_and_screenshots(app_folder, infos):
locale = "en" # FIXME, deduce locale code from request
if os.path.exists(os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")):
description_path = os.path.join(app_folder, "doc", f"DESCRIPTION_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "DESCRIPTION.md")):
description_path = os.path.join(app_folder, "doc", "DESCRIPTION.md")
else:
description_path = None
if description_path:
with open(description_path) as f:
infos["full_description_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
else:
infos["full_description_html"] = infos['manifest']['description'][locale]
if os.path.exists(os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")):
pre_install_path = os.path.join(app_folder, "doc", f"PRE_INSTALL_{locale}.md")
elif os.path.exists(os.path.join(app_folder, "doc", "PRE_INSTALL.md")):
pre_install_path = os.path.join(app_folder, "doc", "PRE_INSTALL.md")
else:
pre_install_path = None
if pre_install_path:
with open(pre_install_path) as f:
infos["pre_install_html"] = emojize(pycmarkgfm.gfm_to_html(f.read()), language="alias")
infos["screenshot"] = None
screenshots_folder = os.path.join(app_folder, "doc", "screenshots")
if os.path.exists(screenshots_folder):
with os.scandir(screenshots_folder) as it:
for entry in it:
ext = os.path.splitext(entry.name)[1].replace(".", "").lower()
if entry.is_file() and ext in ("png", "jpg", "jpeg", "webp", "gif"):
with open(entry.path, "rb") as img_file:
data = base64.b64encode(img_file.read()).decode("utf-8")
infos[
"screenshot"
] = f"data:image/{ext};charset=utf-8;base64,{data}"
break
ram_build_requirement = infos["manifest"]["integration"]["ram"]["build"]
infos["manifest"]["integration"]["ram"]["build_binary"] = human_to_binary(ram_build_requirement)