Files
platform/gateway/auth.py
Yusuf Suleman 5f5660893d fix: TLS verification, cookie hardening, and proxy transport (#7)
- Renamed _ssl_ctx to _internal_ssl_ctx (explicitly scoped to internal services)
- Image proxy now uses default SSL context (TLS verification enabled for external URLs)
- Logout cookie clearing now includes HttpOnly, Secure, SameSite=Lax
- proxy.py still uses internal context (Docker services have no valid certs)

Closes #7
2026-03-29 09:13:37 -05:00

96 lines
3.0 KiB
Python

"""
Platform Gateway — Auth handlers (login, logout, register).
"""
"""
NOTE: Passwords are hashed with bcrypt. Any existing SHA-256 hashed passwords
in the database will no longer work. The admin user is re-seeded on first boot
if no users exist. Other users need manual password reset.
"""
import json
import sqlite3
import bcrypt
from database import get_db
from sessions import create_session, delete_session
def handle_login(handler, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
username = data.get("username", "").strip().lower()
password = data.get("password", "")
if not username or not password:
handler._send_json({"error": "Username and password required"}, 400)
return
conn = get_db()
user = conn.execute("SELECT * FROM users WHERE username = ?",
(username,)).fetchone()
conn.close()
if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
handler._send_json({"error": "Invalid credentials"}, 401)
return
token = create_session(user["id"])
handler.send_response(200)
handler.send_header("Content-Type", "application/json")
handler._set_session_cookie(token)
resp = json.dumps({
"success": True,
"user": {"id": user["id"], "username": user["username"], "display_name": user["display_name"]}
}).encode()
handler.send_header("Content-Length", len(resp))
handler.end_headers()
handler.wfile.write(resp)
def handle_logout(handler):
token = handler._get_session_token()
delete_session(token)
handler.send_response(200)
handler.send_header("Content-Type", "application/json")
handler.send_header("Set-Cookie", "platform_session=; Path=/; HttpOnly; Secure; SameSite=Lax; Max-Age=0")
resp = b'{"success": true}'
handler.send_header("Content-Length", len(resp))
handler.end_headers()
handler.wfile.write(resp)
def handle_register(handler, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
username = data.get("username", "").strip().lower()
password = data.get("password", "")
display_name = data.get("display_name", username)
if not username or not password:
handler._send_json({"error": "Username and password required"}, 400)
return
pw_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
conn = get_db()
try:
conn.execute("INSERT INTO users (username, password_hash, display_name) VALUES (?, ?, ?)",
(username, pw_hash, display_name))
conn.commit()
user_id = conn.execute("SELECT id FROM users WHERE username = ?", (username,)).fetchone()["id"]
conn.close()
handler._send_json({"success": True, "user_id": user_id})
except sqlite3.IntegrityError:
conn.close()
handler._send_json({"error": "Username already exists"}, 409)