Add password reset flow with email-based token verification

Implement secure password reset functionality:
- Add password_reset_tokens table for storing hashed reset tokens
- Create email utility module (dev mode logs to console)
- Add API endpoints: request, validate, and complete password reset
- Add web routes and templates for forgot-password and reset-password
- Security: 1-hour token expiry, single-use, rate limiting, session invalidation
- Prevent email enumeration by always returning success message

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-01-17 04:34:43 -04:00
parent 205cd8d9cf
commit 05a2fae94c
7 changed files with 577 additions and 7 deletions

View File

@ -1741,6 +1741,177 @@ def create_app() -> Flask:
}
})
@app.route("/api/v1/password-reset/request", methods=["POST"])
def request_password_reset() -> Response:
"""Request a password reset email.
Always returns success to prevent email enumeration.
"""
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
# Rate limit by IP
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
rate_key = f"{ip}:password_reset"
allowed, _ = rate_limiter.check(rate_key, 5, 3600) # 5 requests per hour per IP
if not allowed:
return error_response("RATE_LIMITED", "Too many password reset requests. Try again later.", 429)
payload = request.get_json(silent=True) or {}
email = (payload.get("email") or "").strip().lower()
# Always return success message (prevent email enumeration)
success_response = jsonify({
"data": {
"status": "success",
"message": "If an account with that email exists, a password reset link has been sent.",
}
})
if not email or not EMAIL_RE.match(email):
return success_response
publisher = query_one(g.db, "SELECT id, email FROM publishers WHERE email = ?", [email])
if not publisher:
return success_response
# Rate limit by email as well
email_rate_key = f"email:{email}:password_reset"
email_allowed, _ = rate_limiter.check(email_rate_key, 3, 3600) # 3 requests per hour per email
if not email_allowed:
return success_response # Still return success to prevent enumeration
# Invalidate any existing unused tokens for this publisher
g.db.execute(
"UPDATE password_reset_tokens SET used_at = CURRENT_TIMESTAMP WHERE publisher_id = ? AND used_at IS NULL",
[publisher["id"]],
)
# Generate a secure token
token, token_hash = generate_token()
expires_at = datetime.utcnow() + timedelta(hours=1)
g.db.execute(
"""
INSERT INTO password_reset_tokens (publisher_id, token_hash, expires_at)
VALUES (?, ?, ?)
""",
[publisher["id"], token_hash, expires_at.isoformat()],
)
g.db.commit()
# Send email (logs to console in dev mode)
from cmdforge.web.email import send_password_reset_email
base_url = request.url_root.rstrip("/")
send_password_reset_email(publisher["email"], token, base_url)
return success_response
@app.route("/api/v1/password-reset/validate", methods=["POST"])
def validate_reset_token() -> Response:
"""Validate a password reset token (optional, for UX)."""
payload = request.get_json(silent=True) or {}
token = payload.get("token", "")
if not token:
return error_response("VALIDATION_ERROR", "Token is required", 400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
row = query_one(
g.db,
"""
SELECT id, expires_at, used_at
FROM password_reset_tokens
WHERE token_hash = ?
""",
[token_hash],
)
if not row:
return error_response("INVALID_TOKEN", "Invalid or expired reset token", 400)
if row["used_at"]:
return error_response("TOKEN_USED", "This reset token has already been used", 400)
try:
expires_at = datetime.fromisoformat(row["expires_at"])
if datetime.utcnow() > expires_at:
return error_response("TOKEN_EXPIRED", "This reset token has expired", 400)
except ValueError:
return error_response("INVALID_TOKEN", "Invalid token data", 400)
return jsonify({"data": {"valid": True}})
@app.route("/api/v1/password-reset/complete", methods=["POST"])
def complete_password_reset() -> Response:
"""Complete password reset with token and new password."""
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
payload = request.get_json(silent=True) or {}
token = payload.get("token", "")
new_password = payload.get("new_password", "")
if not token:
return error_response("VALIDATION_ERROR", "Token is required", 400)
if not new_password or len(new_password) < 8:
return error_response("VALIDATION_ERROR", "Password must be at least 8 characters", 400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
row = query_one(
g.db,
"""
SELECT prt.id, prt.publisher_id, prt.expires_at, prt.used_at, p.email
FROM password_reset_tokens prt
JOIN publishers p ON prt.publisher_id = p.id
WHERE prt.token_hash = ?
""",
[token_hash],
)
if not row:
return error_response("INVALID_TOKEN", "Invalid or expired reset token", 400)
if row["used_at"]:
return error_response("TOKEN_USED", "This reset token has already been used", 400)
try:
expires_at = datetime.fromisoformat(row["expires_at"])
if datetime.utcnow() > expires_at:
return error_response("TOKEN_EXPIRED", "This reset token has expired", 400)
except ValueError:
return error_response("INVALID_TOKEN", "Invalid token data", 400)
# Hash and save new password
new_hash = password_hasher.hash(new_password)
# Update password
g.db.execute(
"UPDATE publishers SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
[new_hash, row["publisher_id"]],
)
# Mark token as used
g.db.execute(
"UPDATE password_reset_tokens SET used_at = CURRENT_TIMESTAMP WHERE id = ?",
[row["id"]],
)
# Invalidate all existing sessions/tokens for this user
g.db.execute(
"UPDATE api_tokens SET revoked_at = CURRENT_TIMESTAMP WHERE publisher_id = ? AND revoked_at IS NULL",
[row["publisher_id"]],
)
g.db.commit()
return jsonify({
"data": {
"status": "success",
"message": "Password has been reset successfully. Please log in with your new password.",
}
})
@app.route("/api/v1/tokens", methods=["POST"])
@require_token
def create_token() -> Response:

View File

@ -399,6 +399,19 @@ CREATE TABLE IF NOT EXISTS registry_settings (
);
CREATE INDEX IF NOT EXISTS idx_settings_category ON registry_settings(category);
-- Password Reset Tokens
CREATE TABLE IF NOT EXISTS password_reset_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
publisher_id INTEGER NOT NULL REFERENCES publishers(id),
token_hash TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_reset_tokens_hash ON password_reset_tokens(token_hash);
CREATE INDEX IF NOT EXISTS idx_reset_tokens_publisher ON password_reset_tokens(publisher_id);
"""

View File

@ -101,3 +101,116 @@ def logout():
return redirect(url_for("web.login"))
session.clear()
return redirect(url_for("web.login"))
@web_bp.route("/forgot-password", methods=["GET", "POST"], endpoint="forgot_password")
def forgot_password():
success_message = None
errors = []
if request.method == "POST":
if not _validate_csrf():
return render_template(
"pages/forgot_password.html",
errors=["Invalid CSRF token"],
)
email = request.form.get("email", "").strip()
result = _api_post("/api/v1/password-reset/request", {"email": email})
if result["status"] == 429:
errors.append("Too many password reset requests. Please try again later.")
else:
# Always show success message to prevent email enumeration
success_message = "If an account with that email exists, a password reset link has been sent. Please check your email."
return render_template(
"pages/forgot_password.html",
success_message=success_message,
errors=errors,
email=email if errors else "",
)
return render_template("pages/forgot_password.html")
@web_bp.route("/reset-password", methods=["GET", "POST"], endpoint="reset_password")
def reset_password():
token = request.args.get("token") or request.form.get("token")
errors = []
success_message = None
if not token:
return render_template(
"pages/reset_password.html",
errors=["Invalid password reset link. Please request a new one."],
token_valid=False,
)
if request.method == "POST":
if not _validate_csrf():
return render_template(
"pages/reset_password.html",
errors=["Invalid CSRF token"],
token=token,
token_valid=True,
)
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
if not new_password or len(new_password) < 8:
errors.append("Password must be at least 8 characters.")
elif new_password != confirm_password:
errors.append("Passwords do not match.")
else:
result = _api_post("/api/v1/password-reset/complete", {
"token": token,
"new_password": new_password,
})
if result["status"] == 200:
success_message = "Your password has been reset successfully."
return render_template(
"pages/reset_password.html",
success_message=success_message,
token_valid=False,
)
else:
error = result["data"].get("error", {})
error_code = error.get("code", "")
if error_code == "TOKEN_EXPIRED":
errors.append("This password reset link has expired. Please request a new one.")
elif error_code == "TOKEN_USED":
errors.append("This password reset link has already been used. Please request a new one.")
else:
errors.append(error.get("message", "Failed to reset password."))
return render_template(
"pages/reset_password.html",
errors=errors,
token=token,
token_valid=True,
)
# Validate token on GET
result = _api_post("/api/v1/password-reset/validate", {"token": token})
if result["status"] != 200:
error = result["data"].get("error", {})
error_code = error.get("code", "")
if error_code == "TOKEN_EXPIRED":
errors.append("This password reset link has expired. Please request a new one.")
elif error_code == "TOKEN_USED":
errors.append("This password reset link has already been used. Please request a new one.")
else:
errors.append("Invalid password reset link. Please request a new one.")
return render_template(
"pages/reset_password.html",
errors=errors,
token_valid=False,
)
return render_template(
"pages/reset_password.html",
token=token,
token_valid=True,
)

134
src/cmdforge/web/email.py Normal file
View File

@ -0,0 +1,134 @@
"""Email sending utilities for CmdForge web.
In development mode, emails are logged to the console instead of being sent.
To enable real email sending, set MAIL_ENABLED=true and configure SMTP settings.
"""
from __future__ import annotations
import logging
from typing import Optional
from flask import current_app
logger = logging.getLogger(__name__)
def send_email(to: str, subject: str, html_body: str, text_body: Optional[str] = None) -> bool:
"""Send an email.
In dev mode (MAIL_ENABLED=false or unset), logs the email to console.
In production (MAIL_ENABLED=true), sends via SMTP.
Args:
to: Recipient email address
subject: Email subject
html_body: HTML content of the email
text_body: Plain text content (optional)
Returns:
True if email was sent/logged successfully, False otherwise
"""
mail_enabled = current_app.config.get("MAIL_ENABLED", False)
if mail_enabled:
# Future: implement real SMTP sending
# smtp_host = current_app.config.get("MAIL_SERVER", "localhost")
# smtp_port = current_app.config.get("MAIL_PORT", 587)
# smtp_user = current_app.config.get("MAIL_USERNAME")
# smtp_pass = current_app.config.get("MAIL_PASSWORD")
# smtp_tls = current_app.config.get("MAIL_USE_TLS", True)
logger.warning("MAIL_ENABLED is true but SMTP is not implemented yet. Falling back to console logging.")
# Dev mode: log to console
logger.info("=" * 60)
logger.info("[EMAIL] To: %s", to)
logger.info("[EMAIL] Subject: %s", subject)
logger.info("[EMAIL] Body:")
if text_body:
logger.info(text_body)
else:
logger.info(html_body)
logger.info("=" * 60)
return True
def send_password_reset_email(to: str, token: str, base_url: str) -> bool:
"""Send a password reset email.
Args:
to: Recipient email address
token: The password reset token
base_url: Base URL of the application (e.g., https://cmdforge.brrd.tech)
Returns:
True if email was sent/logged successfully
"""
reset_url = f"{base_url.rstrip('/')}/reset-password?token={token}"
subject = "Reset your CmdForge password"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ text-align: center; margin-bottom: 30px; }}
.logo {{ font-size: 24px; font-weight: bold; color: #4F46E5; }}
.button {{ display: inline-block; padding: 12px 24px; background-color: #4F46E5; color: white; text-decoration: none; border-radius: 6px; font-weight: 500; }}
.footer {{ margin-top: 40px; padding-top: 20px; border-top: 1px solid #eee; font-size: 12px; color: #666; }}
.link {{ word-break: break-all; color: #4F46E5; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<div class="logo">CmdForge</div>
</div>
<p>Hello,</p>
<p>You requested to reset your password for your CmdForge account. Click the button below to set a new password:</p>
<p style="text-align: center; margin: 30px 0;">
<a href="{reset_url}" class="button">Reset Password</a>
</p>
<p>Or copy and paste this link into your browser:</p>
<p class="link">{reset_url}</p>
<p><strong>This link will expire in 1 hour.</strong></p>
<p>If you didn't request this password reset, you can safely ignore this email. Your password will remain unchanged.</p>
<div class="footer">
<p>This email was sent by CmdForge. If you have questions, visit our website or contact support.</p>
</div>
</div>
</body>
</html>
"""
text_body = f"""Reset your CmdForge password
Hello,
You requested to reset your password for your CmdForge account.
Reset your password by visiting this link:
{reset_url}
This link will expire in 1 hour.
If you didn't request this password reset, you can safely ignore this email.
Your password will remain unchanged.
---
CmdForge
"""
return send_email(to, subject, html_body, text_body)

View File

@ -884,13 +884,6 @@ def terms():
return render_template("pages/terms.html")
@web_bp.route("/forgot-password", endpoint="forgot_password")
def forgot_password():
return render_template(
"pages/content.html",
title="Reset Password",
body="Password resets are not available yet. Please contact support if needed.",
)
@web_bp.route("/robots.txt", endpoint="robots")

View File

@ -0,0 +1,61 @@
{% extends "base.html" %}
{% from "components/forms.html" import text_input, button_primary, form_errors %}
{% block title %}Reset Password - CmdForge{% endblock %}
{% block content %}
<div class="min-h-[70vh] flex items-center justify-center px-4 py-12">
<div class="w-full max-w-md">
<div class="text-center mb-8">
<a href="{{ url_for('web.home') }}" class="inline-flex items-center justify-center">
<svg class="w-10 h-10 text-indigo-600" fill="currentColor" viewBox="0 0 24 24">
<path d="M8 9l3 3-3 3m5 0h3M5 20h14a2 2 0 002-2V6a2 2 0 00-2-2H5a2 2 0 00-2 2v12a2 2 0 002 2z"/>
</svg>
</a>
<h1 class="mt-4 text-2xl font-bold text-gray-900">Reset your password</h1>
<p class="mt-2 text-gray-600">
Enter your email address and we'll send you a link to reset your password.
</p>
</div>
<div class="bg-white rounded-lg border border-gray-200 p-8 shadow-sm">
{% if success_message %}
<div class="mb-6 p-4 bg-green-50 border border-green-200 rounded-lg">
<div class="flex">
<svg class="w-5 h-5 text-green-500 mr-3 mt-0.5" fill="currentColor" viewBox="0 0 20 20">
<path fill-rule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.707-9.293a1 1 0 00-1.414-1.414L9 10.586 7.707 9.293a1 1 0 00-1.414 1.414l2 2a1 1 0 001.414 0l4-4z" clip-rule="evenodd"/>
</svg>
<div>
<p class="text-sm text-green-800">{{ success_message }}</p>
</div>
</div>
</div>
{% else %}
{{ form_errors(errors) }}
<form action="{{ url_for('web.forgot_password') }}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
{{ text_input(
name='email',
label='Email address',
type='email',
placeholder='you@example.com',
required=true,
value=email or ''
) }}
{{ button_primary('Send reset link', full_width=true) }}
</form>
{% endif %}
</div>
<p class="mt-6 text-center text-sm text-gray-600">
Remember your password?
<a href="{{ url_for('web.login') }}" class="text-indigo-600 hover:text-indigo-800 font-medium">
Sign in
</a>
</p>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,85 @@
{% extends "base.html" %}
{% from "components/forms.html" import text_input, button_primary, form_errors %}
{% block title %}Set New Password - CmdForge{% endblock %}
{% block content %}
<div class="min-h-[70vh] flex items-center justify-center px-4 py-12">
<div class="w-full max-w-md">
<div class="text-center mb-8">
<a href="{{ url_for('web.home') }}" class="inline-flex items-center justify-center">
<svg class="w-10 h-10 text-indigo-600" fill="currentColor" viewBox="0 0 24 24">
<path d="M8 9l3 3-3 3m5 0h3M5 20h14a2 2 0 002-2V6a2 2 0 00-2-2H5a2 2 0 00-2 2v12a2 2 0 002 2z"/>
</svg>
</a>
<h1 class="mt-4 text-2xl font-bold text-gray-900">Set new password</h1>
{% if token_valid %}
<p class="mt-2 text-gray-600">
Enter your new password below.
</p>
{% endif %}
</div>
<div class="bg-white rounded-lg border border-gray-200 p-8 shadow-sm">
{% if success_message %}
<div class="mb-6 p-4 bg-green-50 border border-green-200 rounded-lg">
<div class="flex">
<svg class="w-5 h-5 text-green-500 mr-3 mt-0.5" fill="currentColor" viewBox="0 0 20 20">
<path fill-rule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.707-9.293a1 1 0 00-1.414-1.414L9 10.586 7.707 9.293a1 1 0 00-1.414 1.414l2 2a1 1 0 001.414 0l4-4z" clip-rule="evenodd"/>
</svg>
<div>
<p class="text-sm text-green-800">{{ success_message }}</p>
<p class="mt-2 text-sm text-green-700">
<a href="{{ url_for('web.login') }}" class="font-medium underline hover:text-green-800">
Sign in with your new password
</a>
</p>
</div>
</div>
</div>
{% elif not token_valid %}
{{ form_errors(errors) }}
<div class="text-center">
<p class="text-gray-600 mb-4">Need to reset your password?</p>
<a href="{{ url_for('web.forgot_password') }}" class="inline-flex items-center justify-center px-4 py-2 border border-transparent rounded-md shadow-sm text-sm font-medium text-white bg-indigo-600 hover:bg-indigo-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-indigo-500">
Request new reset link
</a>
</div>
{% else %}
{{ form_errors(errors) }}
<form action="{{ url_for('web.reset_password') }}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="token" value="{{ token }}">
{{ text_input(
name='new_password',
label='New password',
type='password',
placeholder='At least 8 characters',
required=true,
help='Use a mix of letters, numbers, and symbols'
) }}
{{ text_input(
name='confirm_password',
label='Confirm new password',
type='password',
placeholder='Confirm your new password',
required=true
) }}
{{ button_primary('Set new password', full_width=true) }}
</form>
{% endif %}
</div>
<p class="mt-6 text-center text-sm text-gray-600">
Remember your password?
<a href="{{ url_for('web.login') }}" class="text-indigo-600 hover:text-indigo-800 font-medium">
Sign in
</a>
</p>
</div>
</div>
{% endblock %}