dynette/app.py

169 lines
4.8 KiB
Python
Raw Normal View History

2023-01-19 00:32:49 +01:00
import hmac
import base64
import os
import re
import yaml
import bcrypt
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
2023-01-30 17:35:08 +01:00
DOMAIN_REGEX = re.compile(
r"^([a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)(\.[a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)*(\.[a-z]{1}([a-z0-9\-]*[a-z0-9])*)$"
)
2023-01-19 00:32:49 +01:00
app = Flask(__name__)
app.config.from_file("config.yml", load=yaml.safe_load)
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["50 per hour"],
storage_uri="memory://",
)
2023-01-30 17:35:08 +01:00
assert os.path.isdir(
app.config["DB_FOLDER"]
), "You should create the DB folder declared in the config"
2023-01-19 00:32:49 +01:00
def _validate_domain(domain):
if not DOMAIN_REGEX.match(domain):
return {"error": f"This is not a valid domain: {domain}"}, 400
2023-01-30 17:35:08 +01:00
if (
len(domain.split(".")) != 3
or domain.split(".", 1)[-1] not in app.config["DOMAINS"]
):
2023-01-19 00:32:49 +01:00
return {"error": f"This subdomain is not handled by this dynette server."}, 400
def _is_available(domain):
return not os.path.exists(f"{app.config['DB_FOLDER']}/{domain}.key")
2023-01-30 17:35:08 +01:00
@app.route("/")
2023-01-19 00:32:49 +01:00
@limiter.exempt
def home():
2023-01-30 17:35:08 +01:00
return "Wanna play the dynette?"
2023-01-19 00:32:49 +01:00
2023-01-30 17:35:08 +01:00
@app.route("/domains")
2023-01-19 00:32:49 +01:00
@limiter.exempt
def domains():
return jsonify(app.config["DOMAINS"]), 200
2023-01-30 17:35:08 +01:00
@app.route("/test/<domain>")
2023-01-19 00:32:49 +01:00
@limiter.limit("3 per minute")
def availability(domain):
error = _validate_domain(domain)
if error:
return error
if _is_available(domain):
return f"Domain {domain} is available", 200
else:
return {"error": f"Subdomain already taken: {domain}"}, 409
2023-01-30 17:35:08 +01:00
@app.route("/key/<key>", methods=["POST"])
2023-01-19 00:32:49 +01:00
@limiter.limit("5 per hour")
def register(key):
try:
key = base64.b64decode(key).decode()
except Exception as e:
return {"error": "Key format is invalid"}, 400
else:
if len(key) != 89:
return {"error": "Key format is invalid"}, 400
try:
data = request.get_json(force=True)
assert isinstance(data, dict)
subdomain = data.get("subdomain")
assert isinstance(subdomain, str)
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if not _is_available(subdomain):
return {"error": f"Subdomain already taken: {subdomain}"}, 409
recovery_password = data.get("recovery_password")
if recovery_password and isinstance(recovery_password, str):
if len(recovery_password) < 8:
return {"error": "Recovery password too short"}, 409
if len(recovery_password) > 1024:
return {"error": "Recovery password too long"}, 409
r_init = recovery_password
2023-01-30 17:35:08 +01:00
recovery_password = bcrypt.hashpw(
password=recovery_password.encode(), salt=bcrypt.gensalt(14)
)
2023-01-19 00:32:49 +01:00
recovery_password = base64.b64encode(recovery_password).decode()
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key", "w") as f:
f.write(key)
if recovery_password:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password", "w") as f:
f.write(recovery_password)
return "OK", 201
2023-01-30 17:35:08 +01:00
@app.route("/domains/<subdomain>", methods=["DELETE"])
2023-01-19 00:32:49 +01:00
@limiter.limit("5 per hour")
def delete_using_recovery_password_or_key(subdomain):
try:
assert isinstance(subdomain, str)
data = request.get_json(force=True)
assert isinstance(data, dict)
recovery_password = data.get("recovery_password")
key = data.get("key")
2023-01-30 17:35:08 +01:00
assert (recovery_password and isinstance(recovery_password, str)) or (
key and isinstance(key, str)
)
2023-01-19 00:32:49 +01:00
if key:
key = base64.b64decode(key).decode()
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if _is_available(subdomain):
return {"error": "Subdomain already deleted"}, 409
if key:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key") as f:
if not hmac.compare_digest(key, f.read()):
return "Access denied", 403
if recovery_password:
2023-01-30 17:35:08 +01:00
if not os.path.exists(
f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"
):
2023-01-19 00:32:49 +01:00
return "Access denied", 403
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password") as f:
hashed = base64.b64decode(f.read())
if not bcrypt.checkpw(recovery_password.encode(), hashed):
return "Access denied", 403
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.key"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.key")
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password")
return "OK", 200