CmdForge/src/smarttools/web/auth.py

104 lines
3.4 KiB
Python

"""Web UI authentication routes."""
from __future__ import annotations
from typing import Dict
from flask import current_app, redirect, render_template, request, session, url_for
from . import web_bp
def _api_post(path: str, payload: Dict) -> Dict:
client = current_app.test_client()
response = client.post(path, json=payload)
return {
"status": response.status_code,
"data": response.get_json(silent=True) or {},
}
def _csrf_token() -> str:
token = session.get("csrf_token")
if not token:
token = current_app.config["CSRF_GENERATOR"]()
session["csrf_token"] = token
return token
def _validate_csrf() -> bool:
form_token = request.form.get("csrf_token", "")
session_token = session.get("csrf_token", "")
return bool(form_token and session_token and form_token == session_token)
@web_bp.route("/login", methods=["GET", "POST"])
def login():
next_url = request.args.get("next") or request.form.get("next")
if request.method == "POST":
if not _validate_csrf():
return render_template(
"pages/login.html",
errors=["Invalid CSRF token"],
next_url=next_url,
)
email = request.form.get("email", "").strip()
password = request.form.get("password", "")
result = _api_post("/api/v1/login", {"email": email, "password": password})
if result["status"] == 200:
data = result["data"].get("data", {})
session.clear()
session["auth_token"] = data.get("token")
session["publisher"] = data.get("publisher", {})
session["user"] = data.get("publisher", {})
current_app.session_interface.rotate_session(session)
if next_url and next_url.startswith("/"):
return redirect(next_url)
return redirect(url_for("web.dashboard"))
error = result["data"].get("error", {}).get("message", "Login failed")
return render_template(
"pages/login.html",
errors=[error],
email=email,
next_url=next_url,
)
return render_template("pages/login.html", next_url=next_url)
@web_bp.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
if not _validate_csrf():
return render_template(
"pages/register.html",
errors=["Invalid CSRF token"],
)
payload = {
"email": request.form.get("email", "").strip(),
"password": request.form.get("password", ""),
"slug": request.form.get("slug", "").strip(),
"display_name": request.form.get("display_name", "").strip(),
}
result = _api_post("/api/v1/register", payload)
if result["status"] == 201:
return redirect(url_for("web.login"))
error = result["data"].get("error", {}).get("message", "Registration failed")
return render_template(
"pages/register.html",
errors=[error],
email=payload["email"],
slug=payload["slug"],
display_name=payload["display_name"],
)
return render_template("pages/register.html")
@web_bp.route("/logout", methods=["POST"])
def logout():
if not _validate_csrf():
return redirect(url_for("web.login"))
session.clear()
return redirect(url_for("web.login"))