dynette/app.py

225 lines
7 KiB
Python
Raw Permalink Normal View History

2023-01-19 00:32:49 +01:00
import hmac
import base64
import os
import re
import yaml
import bcrypt
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
2023-04-07 16:51:00 +02:00
from werkzeug.middleware.proxy_fix import ProxyFix
2023-01-30 17:35:08 +01:00
DOMAIN_REGEX = re.compile(
r"^([a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)(\.[a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)*(\.[a-z]{1}([a-z0-9\-]*[a-z0-9])*)$"
)
2023-01-19 00:32:49 +01:00
2023-09-28 16:38:10 +02:00
def trusted_ip():
# This is for example the CI, or developers testing new developments
return request.environ.get("HTTP_X_FORWARDED_HOST") in app.config.get("LIMIT_EXEMPTED_IPS", [])
2023-01-19 00:32:49 +01:00
app = Flask(__name__)
app.config.from_file("config.yml", load=yaml.safe_load)
2023-04-07 16:51:00 +02:00
# cf. https://flask-limiter.readthedocs.io/en/stable/recipes.html#deploying-an-application-behind-a-proxy
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
2023-01-19 00:32:49 +01:00
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["50 per hour"],
#storage_uri="memory://", # <- For development
storage_uri="redis://localhost:6379",
storage_options={"socket_connect_timeout": 30},
strategy="fixed-window", # or "moving-window"
2023-09-28 16:38:10 +02:00
application_limits_exempt_when=trusted_ip,
default_limits_exempt_when=trusted_ip,
2023-01-19 00:32:49 +01:00
)
2023-01-30 17:35:08 +01:00
assert os.path.isdir(
app.config["DB_FOLDER"]
), "You should create the DB folder declared in the config"
2023-01-19 00:32:49 +01:00
def _validate_domain(domain):
if not DOMAIN_REGEX.match(domain):
return {"error": f"This is not a valid domain: {domain}"}, 400
2023-01-30 17:35:08 +01:00
if (
len(domain.split(".")) != 3
or domain.split(".", 1)[-1] not in app.config["DOMAINS"]
):
return {"error": "This subdomain is not handled by this dynette server."}, 400
2023-01-19 00:32:49 +01:00
def _is_available(domain):
return not os.path.exists(f"{app.config['DB_FOLDER']}/{domain}.key")
2023-01-30 17:35:08 +01:00
@app.route("/")
2023-01-19 00:32:49 +01:00
@limiter.exempt
def home():
2023-01-30 17:35:08 +01:00
return "Wanna play the dynette?"
2023-01-19 00:32:49 +01:00
2023-01-30 17:35:08 +01:00
@app.route("/domains")
2023-01-19 00:32:49 +01:00
@limiter.exempt
def domains():
return jsonify(app.config["DOMAINS"]), 200
2023-01-30 17:35:08 +01:00
@app.route("/test/<domain>")
2023-09-28 16:38:10 +02:00
@limiter.limit("50 per hour", exempt_when=trusted_ip)
2023-01-19 00:32:49 +01:00
def availability(domain):
error = _validate_domain(domain)
if error:
return error
if _is_available(domain):
2023-04-07 16:51:00 +02:00
return f'"Domain {domain} is available"', 200
2023-01-19 00:32:49 +01:00
else:
return {"error": f"Subdomain already taken: {domain}"}, 409
2023-01-30 17:35:08 +01:00
@app.route("/key/<key>", methods=["POST"])
2023-09-28 16:38:10 +02:00
@limiter.limit("5 per hour", exempt_when=trusted_ip)
2023-01-19 00:32:49 +01:00
def register(key):
try:
key = base64.b64decode(key).decode()
except Exception as e:
return {"error": "Key format is invalid"}, 400
else:
if len(key) != 89:
return {"error": "Key format is invalid"}, 400
try:
data = dict(request.form) # get_json(force=True)
2023-01-19 00:32:49 +01:00
subdomain = data.get("subdomain")
assert isinstance(subdomain, str)
2023-04-07 16:51:00 +02:00
except Exception as e:
return {"error": f"Invalid request: {str(request.form)}"}, 400
2023-01-19 00:32:49 +01:00
error = _validate_domain(subdomain)
if error:
return error
if not _is_available(subdomain):
return {"error": f"Subdomain already taken: {subdomain}"}, 409
recovery_password = data.get("recovery_password")
if recovery_password and isinstance(recovery_password, str):
if len(recovery_password) < 8:
return {"error": "Recovery password too short"}, 409
if len(recovery_password) > 1024:
return {"error": "Recovery password too long"}, 409
2023-01-30 17:35:08 +01:00
recovery_password = bcrypt.hashpw(
password=recovery_password.encode(), salt=bcrypt.gensalt(14)
)
2023-01-19 00:32:49 +01:00
recovery_password = base64.b64encode(recovery_password).decode()
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key", "w") as f:
f.write(key)
if recovery_password:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password", "w") as f:
f.write(recovery_password)
2023-04-07 16:51:00 +02:00
return '"OK"', 201
2023-01-19 00:32:49 +01:00
2023-01-30 17:35:08 +01:00
@app.route("/domains/<subdomain>", methods=["DELETE"])
2023-09-28 16:38:10 +02:00
@limiter.limit("5 per hour", exempt_when=trusted_ip)
2023-01-19 00:32:49 +01:00
def delete_using_recovery_password_or_key(subdomain):
try:
assert isinstance(subdomain, str)
data = dict(request.form) # get_json(force=True)
2023-01-19 00:32:49 +01:00
recovery_password = data.get("recovery_password")
key = data.get("key")
2023-01-30 17:35:08 +01:00
assert (recovery_password and isinstance(recovery_password, str)) or (
key and isinstance(key, str)
)
2023-01-19 00:32:49 +01:00
if key:
key = base64.b64decode(key).decode()
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if _is_available(subdomain):
return {"error": "Subdomain already deleted"}, 409
if key:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key") as f:
if not hmac.compare_digest(key, f.read()):
2023-04-07 16:51:00 +02:00
return '"Access denied"', 403
2023-04-11 18:58:24 +02:00
elif recovery_password:
2023-01-30 17:35:08 +01:00
if not os.path.exists(
f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"
):
2023-04-07 16:51:00 +02:00
return '"Access denied"', 403
2023-01-19 00:32:49 +01:00
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password") as f:
hashed = base64.b64decode(f.read())
if not bcrypt.checkpw(recovery_password.encode(), hashed):
2023-04-07 16:51:00 +02:00
return '"Access denied"', 403
2023-04-11 18:58:24 +02:00
# Shouldnt happen, this is checked before
else:
return '"Access denied"', 403
2023-01-19 00:32:49 +01:00
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.key"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.key")
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password")
2023-04-07 16:51:00 +02:00
return '"OK"', 200
@app.route("/domains/<subdomain>/recovery_password", methods=["PUT"])
2023-09-28 16:38:10 +02:00
@limiter.limit("5 per hour", exempt_when=trusted_ip)
def set_recovery_password_using_key(subdomain):
try:
assert isinstance(subdomain, str)
data = dict(request.form) # get_json(force=True)
recovery_password = data.get("recovery_password")
key = data.get("key")
assert (recovery_password and isinstance(recovery_password, str)) and (
key and isinstance(key, str)
)
if key:
key = base64.b64decode(key).decode()
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if _is_available(subdomain):
return {"error": "Subdomain not registered"}, 404
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key") as f:
if not hmac.compare_digest(key, f.read()):
return '"Access denied"', 403
if len(recovery_password) < 8:
return {"error": "Recovery password too short"}, 409
if len(recovery_password) > 1024:
return {"error": "Recovery password too long"}, 409
recovery_password = bcrypt.hashpw(
password=recovery_password.encode(), salt=bcrypt.gensalt(14)
)
recovery_password = base64.b64encode(recovery_password).decode()
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password", "w") as f:
f.write(recovery_password)
return '"OK"', 200