dynette/app.py

156 lines
4.7 KiB
Python
Raw Normal View History

2023-01-19 00:32:49 +01:00
import hmac
import base64
import os
import re
import yaml
import bcrypt
from flask import Flask, jsonify, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
DOMAIN_REGEX = re.compile(r"^([a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)(\.[a-z0-9]{1}([a-z0-9\-]*[a-z0-9])*)*(\.[a-z]{1}([a-z0-9\-]*[a-z0-9])*)$")
app = Flask(__name__)
app.config.from_file("config.yml", load=yaml.safe_load)
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["50 per hour"],
storage_uri="memory://",
)
assert os.path.isdir(app.config['DB_FOLDER']), "You should create the DB folder declared in the config"
def _validate_domain(domain):
if not DOMAIN_REGEX.match(domain):
return {"error": f"This is not a valid domain: {domain}"}, 400
if len(domain.split(".")) != 3 or domain.split(".", 1)[-1] not in app.config["DOMAINS"]:
return {"error": f"This subdomain is not handled by this dynette server."}, 400
def _is_available(domain):
return not os.path.exists(f"{app.config['DB_FOLDER']}/{domain}.key")
@app.route('/')
@limiter.exempt
def home():
return 'Wanna play the dynette?'
@app.route('/domains')
@limiter.exempt
def domains():
return jsonify(app.config["DOMAINS"]), 200
@app.route('/test/<domain>')
@limiter.limit("3 per minute")
def availability(domain):
error = _validate_domain(domain)
if error:
return error
if _is_available(domain):
return f"Domain {domain} is available", 200
else:
return {"error": f"Subdomain already taken: {domain}"}, 409
@app.route('/key/<key>', methods=['POST'])
@limiter.limit("5 per hour")
def register(key):
try:
key = base64.b64decode(key).decode()
except Exception as e:
return {"error": "Key format is invalid"}, 400
else:
if len(key) != 89:
return {"error": "Key format is invalid"}, 400
try:
data = request.get_json(force=True)
assert isinstance(data, dict)
subdomain = data.get("subdomain")
assert isinstance(subdomain, str)
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if not _is_available(subdomain):
return {"error": f"Subdomain already taken: {subdomain}"}, 409
recovery_password = data.get("recovery_password")
if recovery_password and isinstance(recovery_password, str):
if len(recovery_password) < 8:
return {"error": "Recovery password too short"}, 409
if len(recovery_password) > 1024:
return {"error": "Recovery password too long"}, 409
r_init = recovery_password
recovery_password = bcrypt.hashpw(password=recovery_password.encode(), salt=bcrypt.gensalt(14))
recovery_password = base64.b64encode(recovery_password).decode()
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key", "w") as f:
f.write(key)
if recovery_password:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password", "w") as f:
f.write(recovery_password)
return "OK", 201
@app.route('/domains/<subdomain>', methods=['DELETE'])
@limiter.limit("5 per hour")
def delete_using_recovery_password_or_key(subdomain):
try:
assert isinstance(subdomain, str)
data = request.get_json(force=True)
assert isinstance(data, dict)
recovery_password = data.get("recovery_password")
key = data.get("key")
assert (recovery_password and isinstance(recovery_password, str)) \
or (key and isinstance(key, str))
if key:
key = base64.b64decode(key).decode()
except Exception:
return {"error": "Invalid request"}, 400
error = _validate_domain(subdomain)
if error:
return error
if _is_available(subdomain):
return {"error": "Subdomain already deleted"}, 409
if key:
with open(f"{app.config['DB_FOLDER']}/{subdomain}.key") as f:
if not hmac.compare_digest(key, f.read()):
return "Access denied", 403
if recovery_password:
if not os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"):
return "Access denied", 403
with open(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password") as f:
hashed = base64.b64decode(f.read())
if not bcrypt.checkpw(recovery_password.encode(), hashed):
return "Access denied", 403
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.key"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.key")
if os.path.exists(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password"):
os.remove(f"{app.config['DB_FOLDER']}/{subdomain}.recovery_password")
return "OK", 200