Files
platform/gateway/server.py

1879 lines
74 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Platform Gateway Auth, session, proxy, dashboard aggregation.
Owns platform identity. Does NOT own business logic.
"""
import os
import json
import sqlite3
import hashlib
import secrets
import urllib.request
import urllib.parse
import urllib.error
import ssl
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
from datetime import datetime, timedelta
from pathlib import Path
# ── Config ──
PORT = int(os.environ.get("PORT", 8100))
DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))
DB_PATH = DATA_DIR / "platform.db"
# Service backends
TRIPS_URL = os.environ.get("TRIPS_BACKEND_URL", "http://localhost:8087")
FITNESS_URL = os.environ.get("FITNESS_BACKEND_URL", "http://localhost:8095")
INVENTORY_URL = os.environ.get("INVENTORY_BACKEND_URL", "http://localhost:4499")
NOCODB_API_TOKEN = os.environ.get("NOCODB_API_TOKEN", "")
MINIFLUX_URL = os.environ.get("MINIFLUX_URL", "http://localhost:8767")
MINIFLUX_API_KEY = os.environ.get("MINIFLUX_API_KEY", "")
TRIPS_API_TOKEN = os.environ.get("TRIPS_API_TOKEN", "")
SHELFMARK_URL = os.environ.get("SHELFMARK_URL", "http://shelfmark:8084")
SPOTIZERR_URL = os.environ.get("SPOTIZERR_URL", "http://spotizerr-app:7171")
BUDGET_URL = os.environ.get("BUDGET_BACKEND_URL", "http://localhost:3001")
# Booklore (book library manager)
BOOKLORE_URL = os.environ.get("BOOKLORE_URL", "http://booklore:6060")
BOOKLORE_USER = os.environ.get("BOOKLORE_USER", "")
BOOKLORE_PASS = os.environ.get("BOOKLORE_PASS", "")
# SMTP2GO (email / Send to Kindle)
SMTP2GO_API_KEY = os.environ.get("SMTP2GO_API_KEY", "")
SMTP2GO_FROM_EMAIL = os.environ.get("SMTP2GO_FROM_EMAIL", "")
SMTP2GO_FROM_NAME = os.environ.get("SMTP2GO_FROM_NAME", "Platform")
KINDLE_EMAIL_1 = os.environ.get("KINDLE_EMAIL_1", "")
KINDLE_EMAIL_2 = os.environ.get("KINDLE_EMAIL_2", "")
BOOKLORE_BOOKS_DIR = Path("/booklore-books")
BOOKDROP_DIR = Path("/bookdrop")
# Karakeep (bookmarking)
KARAKEEP_URL = os.environ.get("KARAKEEP_URL", "http://192.168.1.42:3005")
KARAKEEP_API_KEY = os.environ.get("KARAKEEP_API_KEY", "")
_booklore_token = {"access": "", "refresh": "", "expires": 0}
# AI
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-5.2")
# Session config
SESSION_MAX_AGE = int(os.environ.get("SESSION_MAX_AGE", 30 * 86400)) # 30 days
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Shared SSL context (skip verification for internal services)
_ssl_ctx = ssl.create_default_context()
_ssl_ctx.check_hostname = False
_ssl_ctx.verify_mode = ssl.CERT_NONE
# ── Database ──
def get_db():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
c.execute('''CREATE TABLE IF NOT EXISTS service_connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
service TEXT NOT NULL,
auth_type TEXT NOT NULL DEFAULT 'bearer',
auth_token TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, service),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
c.execute('''CREATE TABLE IF NOT EXISTS apps (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
icon TEXT DEFAULT '',
route_prefix TEXT NOT NULL,
proxy_target TEXT NOT NULL,
sort_order INTEGER DEFAULT 0,
enabled INTEGER DEFAULT 1,
dashboard_widget TEXT DEFAULT NULL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS pinned_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
service TEXT NOT NULL DEFAULT 'inventory',
item_id TEXT NOT NULL,
item_name TEXT NOT NULL DEFAULT '',
pinned_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, service, item_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
conn.commit()
# Seed default apps
existing = c.execute("SELECT COUNT(*) FROM apps").fetchone()[0]
if existing == 0:
c.execute("INSERT INTO apps VALUES ('trips', 'Trips', 'map', '/trips', ?, 1, 1, 'upcoming_trips')", (TRIPS_URL,))
c.execute("INSERT INTO apps VALUES ('fitness', 'Fitness', 'bar-chart', '/fitness', ?, 2, 1, 'daily_calories')", (FITNESS_URL,))
c.execute("INSERT INTO apps VALUES ('inventory', 'Inventory', 'package', '/inventory', ?, 3, 1, 'items_issues')", (INVENTORY_URL,))
conn.commit()
else:
# Ensure inventory app exists (migration for existing DBs)
inv = c.execute("SELECT id FROM apps WHERE id = 'inventory'").fetchone()
if not inv:
c.execute("INSERT INTO apps VALUES ('inventory', 'Inventory', 'package', '/inventory', ?, 3, 1, 'items_issues')", (INVENTORY_URL,))
conn.commit()
print("[Gateway] Added inventory app")
# Ensure reader app exists
rdr = c.execute("SELECT id FROM apps WHERE id = 'reader'").fetchone()
if not rdr:
c.execute("INSERT INTO apps VALUES ('reader', 'Reader', 'rss', '/reader', ?, 4, 1, 'unread_count')", (MINIFLUX_URL,))
conn.commit()
print("[Gateway] Added reader app")
# Ensure books app exists (now media)
books = c.execute("SELECT id FROM apps WHERE id = 'books'").fetchone()
if not books:
c.execute("INSERT INTO apps VALUES ('books', 'Media', 'book', '/media', ?, 5, 1, NULL)", (SHELFMARK_URL,))
conn.commit()
print("[Gateway] Added media app")
else:
c.execute("UPDATE apps SET name = 'Media', route_prefix = '/media' WHERE id = 'books'")
conn.commit()
# Ensure music (spotizerr) app exists
music = c.execute("SELECT id FROM apps WHERE id = 'music'").fetchone()
if not music:
c.execute("INSERT INTO apps VALUES ('music', 'Music', 'music', '/music', ?, 6, 1, NULL)", (SPOTIZERR_URL,))
conn.commit()
print("[Gateway] Added music app")
# Ensure budget app exists
budget = c.execute("SELECT id FROM apps WHERE id = 'budget'").fetchone()
if not budget:
c.execute("INSERT OR IGNORE INTO apps VALUES ('budget', 'Budget', 'dollar-sign', '/budget', ?, 7, 1, 'budget_summary')", (BUDGET_URL,))
conn.commit()
print("[Gateway] Added budget app")
# Seed default admin user if empty
user_count = c.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if user_count == 0:
pw_hash = hashlib.sha256("admin".encode()).hexdigest()
c.execute("INSERT INTO users (username, password_hash, display_name) VALUES (?, ?, ?)",
("admin", pw_hash, "Yusuf"))
conn.commit()
print("[Gateway] Created default user: admin / admin")
conn.close()
# ── Session helpers ──
def create_session(user_id):
token = secrets.token_hex(32)
expires = (datetime.now() + timedelta(seconds=SESSION_MAX_AGE)).isoformat()
conn = get_db()
conn.execute("INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
(token, user_id, expires))
conn.commit()
conn.close()
return token
def get_session_user(token):
if not token:
return None
conn = get_db()
row = conn.execute("""
SELECT u.* FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.token = ? AND s.expires_at > ?
""", (token, datetime.now().isoformat())).fetchone()
conn.close()
return dict(row) if row else None
def delete_session(token):
if not token:
return
conn = get_db()
conn.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit()
conn.close()
def get_service_token(user_id, service):
conn = get_db()
row = conn.execute(
"SELECT auth_type, auth_token FROM service_connections WHERE user_id = ? AND service = ?",
(user_id, service)
).fetchone()
conn.close()
return dict(row) if row else None
def set_service_token(user_id, service, auth_token, auth_type="bearer"):
conn = get_db()
conn.execute("""
INSERT INTO service_connections (user_id, service, auth_type, auth_token)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id, service) DO UPDATE SET auth_token = ?, auth_type = ?
""", (user_id, service, auth_type, auth_token, auth_token, auth_type))
conn.commit()
conn.close()
def delete_service_token(user_id, service):
conn = get_db()
conn.execute("DELETE FROM service_connections WHERE user_id = ? AND service = ?",
(user_id, service))
conn.commit()
conn.close()
# ── Proxy helper ──
def proxy_request(target_url, method, headers, body=None, timeout=120):
"""Proxy a request to a backend service. Returns (status, response_headers, response_body)."""
try:
req = urllib.request.Request(target_url, data=body, method=method)
for k, v in headers.items():
req.add_header(k, v)
with urllib.request.urlopen(req, context=_ssl_ctx, timeout=timeout) as resp:
resp_body = resp.read()
resp_headers = dict(resp.headers)
return resp.status, resp_headers, resp_body
except urllib.error.HTTPError as e:
body = e.read() if e.fp else b'{}'
return e.code, dict(e.headers), body
except Exception as e:
return 502, {"Content-Type": "application/json"}, json.dumps({"error": f"Service unavailable: {e}"}).encode()
# ── Service routing ──
SERVICE_MAP = {} # populated from DB at startup
def load_service_map():
global SERVICE_MAP
conn = get_db()
rows = conn.execute("SELECT id, proxy_target FROM apps WHERE enabled = 1").fetchall()
conn.close()
SERVICE_MAP = {row["id"]: row["proxy_target"] for row in rows}
def resolve_service(path):
"""Given /api/trips/foo, return ('trips', 'http://backend:8087', '/api/foo')."""
# Path format: /api/{service_id}/...
parts = path.split("/", 4) # ['', 'api', 'trips', 'foo']
if len(parts) < 3:
return None, None, None
service_id = parts[2]
target = SERVICE_MAP.get(service_id)
if not target:
return None, None, None
remainder = "/" + "/".join(parts[3:]) if len(parts) > 3 else "/"
# Services that don't use /api prefix (Express apps, etc.)
NO_API_PREFIX_SERVICES = {"inventory", "music", "budget"}
SERVICE_PATH_PREFIX = {"reader": "/v1"}
if service_id in SERVICE_PATH_PREFIX:
backend_path = f"{SERVICE_PATH_PREFIX[service_id]}{remainder}"
elif service_id in NO_API_PREFIX_SERVICES:
backend_path = remainder
elif remainder.startswith("/images/") or remainder.startswith("/documents/"):
backend_path = remainder
else:
backend_path = f"/api{remainder}"
return service_id, target, backend_path
# ── Request handler ──
class GatewayHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
def _send_json(self, data, status=200):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def _get_session_token(self):
cookie = SimpleCookie(self.headers.get("Cookie", ""))
if "platform_session" in cookie:
return cookie["platform_session"].value
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return None
def _get_user(self):
token = self._get_session_token()
return get_session_user(token)
def _require_auth(self):
user = self._get_user()
if not user:
self._send_json({"error": "Unauthorized"}, 401)
return None
return user
def _set_session_cookie(self, token):
self.send_header("Set-Cookie",
f"platform_session={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age={SESSION_MAX_AGE}")
def _read_body(self):
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length) if length > 0 else b""
# ── Routing ──
def do_OPTIONS(self):
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, Cookie")
self.send_header("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
self.end_headers()
def do_GET(self):
path = self.path.split("?")[0]
# Image serving — try trips first, then fitness
if path.startswith("/images/"):
user = self._get_user()
for service_id, target in SERVICE_MAP.items():
headers = {}
if user:
svc_token = get_service_token(user["id"], service_id)
if svc_token:
headers["Authorization"] = f"Bearer {svc_token['auth_token']}"
if not headers.get("Authorization") and service_id == "trips" and TRIPS_API_TOKEN:
headers["Authorization"] = f"Bearer {TRIPS_API_TOKEN}"
status, resp_headers, resp_body = proxy_request(f"{target}{path}", "GET", headers, timeout=15)
if status == 200:
self.send_response(200)
ct = resp_headers.get("Content-Type", "application/octet-stream")
self.send_header("Content-Type", ct)
self.send_header("Content-Length", len(resp_body))
self.send_header("Cache-Control", "public, max-age=86400")
self.end_headers()
self.wfile.write(resp_body)
return
self._send_json({"error": "Image not found"}, 404)
return
# Health check
if path == "/api/health":
self._send_json({"status": "ok", "service": "gateway"})
return
# Public: no auth needed
if path == "/api/auth/me":
user = self._get_user()
if user:
self._send_json({
"authenticated": True,
"user": {"id": user["id"], "username": user["username"], "display_name": user["display_name"]}
})
else:
self._send_json({"authenticated": False, "user": None})
return
# Protected gateway routes
if path == "/api/apps":
user = self._require_auth()
if not user:
return
self._handle_apps(user)
return
if path == "/api/me":
user = self._require_auth()
if not user:
return
self._handle_me(user)
return
if path == "/api/me/connections":
user = self._require_auth()
if not user:
return
self._handle_connections(user)
return
if path == "/api/dashboard":
user = self._require_auth()
if not user:
return
self._handle_dashboard(user)
return
if path == "/api/pinned":
user = self._require_auth()
if not user:
return
self._handle_get_pinned(user)
return
# Booklore books list
if path == "/api/booklore/books":
user = self._require_auth()
if not user:
return
self._handle_booklore_books()
return
# Kindle targets
if path == "/api/kindle/targets":
user = self._require_auth()
if not user:
return
kindle_labels = os.environ.get("KINDLE_LABELS", "Kindle 1,Kindle 2").split(",")
targets = []
if KINDLE_EMAIL_1:
targets.append({"id": "1", "label": kindle_labels[0].strip() if kindle_labels else "Kindle 1", "email": KINDLE_EMAIL_1})
if KINDLE_EMAIL_2:
targets.append({"id": "2", "label": kindle_labels[1].strip() if len(kindle_labels) > 1 else "Kindle 2", "email": KINDLE_EMAIL_2})
self._send_json({"targets": targets, "configured": bool(SMTP2GO_API_KEY and SMTP2GO_FROM_EMAIL)})
return
# Booklore book cover proxy
if path.startswith("/api/booklore/books/") and path.endswith("/cover"):
user = self._require_auth()
if not user:
return
book_id = path.split("/")[4]
self._handle_booklore_cover(book_id)
return
# Downloads status (qBittorrent)
if path == "/api/downloads/status":
user = self._require_auth()
if not user:
return
self._handle_downloads_status()
return
# Booklore libraries
if path == "/api/booklore/libraries":
user = self._require_auth()
if not user:
return
self._handle_booklore_libraries()
return
# Image proxy — fetch external images server-side to bypass hotlink blocks
if path == "/api/image-proxy":
user = self._require_auth()
if not user:
return
self._handle_image_proxy()
return
# Service proxy
if path.startswith("/api/"):
self._proxy("GET", path)
return
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
path = self.path.split("?")[0]
body = self._read_body()
# Auth routes (public)
if path == "/api/auth/login":
self._handle_login(body)
return
if path == "/api/auth/logout":
self._handle_logout()
return
if path == "/api/auth/register":
self._handle_register(body)
return
# Connection management
if path == "/api/me/connections":
user = self._require_auth()
if not user:
return
self._handle_set_connection(user, body)
return
# Pin/unpin items
if path == "/api/pin":
user = self._require_auth()
if not user:
return
self._handle_pin(user, body)
return
if path == "/api/unpin":
user = self._require_auth()
if not user:
return
self._handle_unpin(user, body)
return
# Send downloaded file to Kindle (by filename from bookdrop)
if path == "/api/kindle/send-file":
user = self._require_auth()
if not user:
return
self._handle_send_file_to_kindle(body)
return
# Send book to Kindle
if path.startswith("/api/booklore/books/") and path.endswith("/send-to-kindle"):
user = self._require_auth()
if not user:
return
book_id = path.split("/")[4]
self._handle_send_to_kindle(book_id, body)
return
# Booklore auto-import from bookdrop
if path == "/api/booklore/import":
user = self._require_auth()
if not user:
return
self._handle_booklore_import(body)
return
if path == "/api/karakeep/save":
user = self._require_auth()
if not user:
return
self._handle_karakeep_save(body)
return
if path == "/api/karakeep/delete":
user = self._require_auth()
if not user:
return
self._handle_karakeep_delete(body)
return
# Command bar — execute natural language actions
if path == "/api/command":
user = self._require_auth()
if not user:
return
self._handle_command(user, body)
return
# Service proxy
if path.startswith("/api/"):
self._proxy("POST", path, body)
return
self._send_json({"error": "Not found"}, 404)
def do_PUT(self):
path = self.path.split("?")[0]
body = self._read_body()
if path.startswith("/api/"):
self._proxy("PUT", path, body)
return
self._send_json({"error": "Not found"}, 404)
def do_PATCH(self):
path = self.path.split("?")[0]
body = self._read_body()
if path.startswith("/api/"):
self._proxy("PATCH", path, body)
return
self._send_json({"error": "Not found"}, 404)
def do_DELETE(self):
path = self.path.split("?")[0]
body = self._read_body()
if path.startswith("/api/"):
self._proxy("DELETE", path, body)
return
self._send_json({"error": "Not found"}, 404)
# ── Auth handlers ──
def _handle_login(self, body):
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
username = data.get("username", "").strip().lower()
password = data.get("password", "")
if not username or not password:
self._send_json({"error": "Username and password required"}, 400)
return
pw_hash = hashlib.sha256(password.encode()).hexdigest()
conn = get_db()
user = conn.execute("SELECT * FROM users WHERE username = ? AND password_hash = ?",
(username, pw_hash)).fetchone()
conn.close()
if not user:
self._send_json({"error": "Invalid credentials"}, 401)
return
token = create_session(user["id"])
self.send_response(200)
self.send_header("Content-Type", "application/json")
self._set_session_cookie(token)
resp = json.dumps({
"success": True,
"user": {"id": user["id"], "username": user["username"], "display_name": user["display_name"]}
}).encode()
self.send_header("Content-Length", len(resp))
self.end_headers()
self.wfile.write(resp)
def _handle_logout(self):
token = self._get_session_token()
delete_session(token)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Set-Cookie", "platform_session=; Path=/; Max-Age=0")
resp = b'{"success": true}'
self.send_header("Content-Length", len(resp))
self.end_headers()
self.wfile.write(resp)
def _handle_register(self, body):
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
username = data.get("username", "").strip().lower()
password = data.get("password", "")
display_name = data.get("display_name", username)
if not username or not password:
self._send_json({"error": "Username and password required"}, 400)
return
pw_hash = hashlib.sha256(password.encode()).hexdigest()
conn = get_db()
try:
conn.execute("INSERT INTO users (username, password_hash, display_name) VALUES (?, ?, ?)",
(username, pw_hash, display_name))
conn.commit()
user_id = conn.execute("SELECT id FROM users WHERE username = ?", (username,)).fetchone()["id"]
conn.close()
self._send_json({"success": True, "user_id": user_id})
except sqlite3.IntegrityError:
conn.close()
self._send_json({"error": "Username already exists"}, 409)
# ── Platform data handlers ──
def _handle_apps(self, user):
conn = get_db()
rows = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
# Check which services the user has connected
connections = conn.execute("SELECT service FROM service_connections WHERE user_id = ?",
(user["id"],)).fetchall()
connected = {r["service"] for r in connections}
conn.close()
apps = []
for row in rows:
app = dict(row)
app["connected"] = app["id"] in connected
del app["proxy_target"] # don't expose internal URLs
apps.append(app)
self._send_json({"apps": apps})
def _handle_me(self, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
self._send_json({
"user": {
"id": user["id"],
"username": user["username"],
"display_name": user["display_name"],
"created_at": user["created_at"]
},
"connections": [dict(c) for c in connections]
})
def _handle_connections(self, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
self._send_json({"connections": [dict(c) for c in connections]})
def _handle_set_connection(self, user, body):
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
service = data.get("service", "")
action = data.get("action", "connect") # connect or disconnect
if action == "disconnect":
delete_service_token(user["id"], service)
self._send_json({"success": True})
return
auth_token = data.get("token", "")
auth_type = data.get("auth_type", "bearer")
if not service or not auth_token:
self._send_json({"error": "service and token required"}, 400)
return
# Validate token against the service
target = SERVICE_MAP.get(service)
if not target:
self._send_json({"error": f"Unknown service: {service}"}, 400)
return
# Test the token
if service == "trips":
test_url = f"{target}/api/trips"
headers = {"Authorization": f"Bearer {auth_token}"}
elif service == "fitness":
test_url = f"{target}/api/users"
headers = {"Authorization": f"Bearer {auth_token}"}
else:
test_url = f"{target}/api/health"
headers = {"Authorization": f"Bearer {auth_token}"}
status, _, _ = proxy_request(test_url, "GET", headers, timeout=10)
if status == 401:
self._send_json({"error": "Invalid token for this service"}, 401)
return
set_service_token(user["id"], service, auth_token, auth_type)
self._send_json({"success": True})
def _handle_pin(self, user, body):
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
item_name = data.get("item_name", "")
service = data.get("service", "inventory")
if not item_id:
self._send_json({"error": "item_id required"}, 400)
return
conn = get_db()
conn.execute("""
INSERT INTO pinned_items (user_id, service, item_id, item_name)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id, service, item_id) DO UPDATE SET item_name = ?, pinned_at = CURRENT_TIMESTAMP
""", (user["id"], service, item_id, item_name, item_name))
conn.commit()
conn.close()
self._send_json({"success": True})
def _handle_unpin(self, user, body):
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
service = data.get("service", "inventory")
conn = get_db()
conn.execute("DELETE FROM pinned_items WHERE user_id = ? AND service = ? AND item_id = ?",
(user["id"], service, item_id))
conn.commit()
conn.close()
self._send_json({"success": True})
# ── Booklore integration ──
def _booklore_auth(self):
"""Get a valid Booklore JWT token, refreshing if needed."""
import time
global _booklore_token
if _booklore_token["access"] and time.time() < _booklore_token["expires"] - 60:
return _booklore_token["access"]
if not BOOKLORE_USER or not BOOKLORE_PASS:
return None
try:
body = json.dumps({"username": BOOKLORE_USER, "password": BOOKLORE_PASS}).encode()
status, _, resp = proxy_request(
f"{BOOKLORE_URL}/api/v1/auth/login", "POST",
{"Content-Type": "application/json"}, body, timeout=10
)
if status == 200:
data = json.loads(resp)
_booklore_token["access"] = data["accessToken"]
_booklore_token["refresh"] = data.get("refreshToken", "")
_booklore_token["expires"] = time.time() + 3600 # 1hr
return _booklore_token["access"]
except Exception as e:
print(f"[Booklore] Auth failed: {e}")
return None
def _handle_booklore_libraries(self):
"""Return Booklore libraries with their paths."""
token = self._booklore_auth()
if not token:
self._send_json({"error": "Booklore auth failed"}, 502)
return
status, _, resp = proxy_request(
f"{BOOKLORE_URL}/api/v1/libraries", "GET",
{"Authorization": f"Bearer {token}"}, timeout=10
)
if status == 200:
libs = json.loads(resp)
result = []
for lib in libs:
paths = [{"id": p["id"], "path": p.get("path", "")} for p in lib.get("paths", [])]
result.append({"id": lib["id"], "name": lib["name"], "paths": paths})
self._send_json({"libraries": result})
else:
self._send_json({"error": "Failed to fetch libraries"}, status)
def _handle_booklore_import(self, body):
"""Auto-import a file from bookdrop into a Booklore library.
Expects: {"fileName": "...", "libraryId": N, "pathId": N}
Flow: rescan bookdrop find file finalize import
"""
import time
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
file_name = data.get("fileName", "")
library_id = data.get("libraryId")
path_id = data.get("pathId")
if not file_name or not library_id or not path_id:
self._send_json({"error": "Missing fileName, libraryId, or pathId"}, 400)
return
token = self._booklore_auth()
if not token:
self._send_json({"error": "Booklore auth failed"}, 502)
return
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
# 1. Trigger bookdrop rescan
proxy_request(f"{BOOKLORE_URL}/api/v1/bookdrop/rescan", "POST", headers, b"{}", timeout=15)
# 2. Poll for the file to appear (up to 15 seconds)
file_id = None
file_meta = None
for _ in range(6):
time.sleep(2.5)
s, _, r = proxy_request(
f"{BOOKLORE_URL}/api/v1/bookdrop/files?status=pending&page=0&size=100",
"GET", {"Authorization": f"Bearer {token}"}, timeout=10
)
if s == 200:
files_data = json.loads(r)
for f in files_data.get("content", []):
if f.get("fileName", "") == file_name:
file_id = f["id"]
file_meta = f.get("originalMetadata") or f.get("fetchedMetadata")
break
if file_id:
break
if not file_id:
self._send_json({"error": "File not found in bookdrop after rescan", "fileName": file_name}, 404)
return
# 3. Build metadata with thumbnailUrl (required by Booklore)
metadata = {
"title": (file_meta or {}).get("title", file_name),
"subtitle": "",
"authors": (file_meta or {}).get("authors", []),
"categories": (file_meta or {}).get("categories", []),
"moods": [],
"tags": [],
"publisher": (file_meta or {}).get("publisher", ""),
"publishedDate": (file_meta or {}).get("publishedDate", ""),
"description": (file_meta or {}).get("description", ""),
"isbn": (file_meta or {}).get("isbn13", (file_meta or {}).get("isbn10", "")),
"language": (file_meta or {}).get("language", ""),
"seriesName": (file_meta or {}).get("seriesName", ""),
"seriesNumber": (file_meta or {}).get("seriesNumber"),
"seriesTotal": (file_meta or {}).get("seriesTotal"),
"thumbnailUrl": (file_meta or {}).get("thumbnailUrl", ""),
}
# 4. Finalize import
payload = json.dumps({"files": [{"fileId": file_id, "libraryId": library_id, "pathId": path_id, "metadata": metadata}]}).encode()
s, _, r = proxy_request(
f"{BOOKLORE_URL}/api/v1/bookdrop/imports/finalize", "POST",
headers, payload, timeout=30
)
if s == 200:
result = json.loads(r)
self._send_json(result)
else:
print(f"[Booklore] Finalize failed ({s}): {r[:200]}")
self._send_json({"error": "Finalize import failed", "status": s}, s)
def _handle_booklore_books(self):
"""Return all books from Booklore."""
token = self._booklore_auth()
if not token:
self._send_json({"error": "Booklore auth failed"}, 502)
return
try:
s, _, r = proxy_request(
f"{BOOKLORE_URL}/api/v1/books", "GET",
{"Authorization": f"Bearer {token}"}, timeout=15
)
if s == 200:
books_raw = json.loads(r)
books = []
for b in books_raw:
m = b.get("metadata") or {}
books.append({
"id": b["id"],
"title": m.get("title") or "Untitled",
"authors": m.get("authors") or [],
"libraryId": b.get("libraryId"),
"libraryName": b.get("libraryName"),
"categories": m.get("categories") or [],
"pageCount": m.get("pageCount"),
"publisher": m.get("publisher"),
"isbn13": m.get("isbn13"),
"isbn10": m.get("isbn10"),
"googleId": m.get("googleId"),
"addedOn": b.get("addedOn"),
})
# Resolve file formats from disk
if BOOKLORE_BOOKS_DIR.exists():
# Build index: lowercase title words → file path
file_index = {}
for ext in ["epub", "pdf", "mobi", "azw3"]:
for fp in BOOKLORE_BOOKS_DIR.rglob(f"*.{ext}"):
file_index[fp.stem.lower()] = fp
for book in books:
title_words = set(book["title"].lower().split()[:4])
best_match = None
best_score = 0
for fname, fp in file_index.items():
matches = sum(1 for w in title_words if w in fname)
score = matches / len(title_words) if title_words else 0
if score > best_score:
best_score = score
best_match = fp
if best_match and best_score >= 0.5:
book["format"] = best_match.suffix.lstrip(".").upper()
else:
book["format"] = None
self._send_json({"books": books, "total": len(books)})
else:
self._send_json({"error": "Failed to fetch books"}, s)
except Exception as e:
self._send_json({"error": str(e)}, 500)
def _handle_booklore_cover(self, book_id):
"""Proxy book cover image from Booklore."""
token = self._booklore_auth()
if not token:
self._send_json({"error": "Booklore auth failed"}, 502)
return
try:
s, headers_raw, body = proxy_request(
f"{BOOKLORE_URL}/api/v1/books/{book_id}/cover", "GET",
{"Authorization": f"Bearer {token}"}, timeout=10
)
if s == 200 and isinstance(body, bytes):
ct = "image/jpeg"
if isinstance(headers_raw, dict):
ct = headers_raw.get("Content-Type", ct)
self.send_response(200)
self.send_header("Content-Type", ct)
self.send_header("Cache-Control", "public, max-age=86400")
self.end_headers()
if isinstance(body, str):
self.wfile.write(body.encode())
else:
self.wfile.write(body)
return
self._send_json({"error": "Cover not found"}, 404)
except Exception as e:
self._send_json({"error": str(e)}, 500)
def _handle_downloads_status(self):
"""Get active downloads from qBittorrent."""
import urllib.parse
qbt_host = os.environ.get("QBITTORRENT_HOST", "192.168.1.42")
qbt_port = os.environ.get("QBITTORRENT_PORT", "8080")
qbt_user = os.environ.get("QBITTORRENT_USERNAME", "admin")
qbt_pass = os.environ.get("QBITTORRENT_PASSWORD", "")
base = f"http://{qbt_host}:{qbt_port}"
try:
# Login
login_data = urllib.parse.urlencode({"username": qbt_user, "password": qbt_pass}).encode()
req = urllib.request.Request(f"{base}/api/v2/auth/login", data=login_data)
with urllib.request.urlopen(req, timeout=5) as resp:
cookie = resp.headers.get("Set-Cookie", "").split(";")[0]
# Get torrents
req2 = urllib.request.Request(f"{base}/api/v2/torrents/info?filter=all&sort=added_on&reverse=true&limit=20",
headers={"Cookie": cookie})
with urllib.request.urlopen(req2, timeout=5) as resp2:
torrents_raw = json.loads(resp2.read())
torrents = []
for t in torrents_raw:
torrents.append({
"hash": t["hash"],
"name": t["name"],
"progress": round(t["progress"] * 100, 1),
"state": t["state"],
"size": t["total_size"],
"downloaded": t["downloaded"],
"dlSpeed": t["dlspeed"],
"upSpeed": t["upspeed"],
"eta": t.get("eta", 0),
"addedOn": t.get("added_on", 0),
"category": t.get("category", ""),
})
self._send_json({"torrents": torrents, "total": len(torrents)})
except Exception as e:
self._send_json({"torrents": [], "total": 0, "error": str(e)})
def _find_book_file(self, book_id: str) -> tuple[Path | None, dict | None]:
"""Find the actual ebook file for a Booklore book ID.
Returns (file_path, book_metadata) or (None, None)."""
token = self._booklore_auth()
if not token:
return None, None
# Get book metadata from Booklore
s, _, r = proxy_request(
f"{BOOKLORE_URL}/api/v1/books/{book_id}", "GET",
{"Authorization": f"Bearer {token}"}, timeout=10
)
if s != 200:
return None, None
book = json.loads(r)
meta = book.get("metadata", {})
title = meta.get("title", "")
lib_name = book.get("libraryName", "")
if not title or not BOOKLORE_BOOKS_DIR.exists():
return None, meta
# Search for the file in the library directory
# Booklore libraries map to subdirectories: "Islamic Books" → /booklore-books/Adult Books/
# We search all subdirectories for a file matching the title
import glob
title_lower = title.lower()
title_words = set(title_lower.split()[:4]) # First 4 words for matching
best_match = None
best_score = 0
for ext in ["epub", "pdf", "mobi", "azw3"]:
for filepath in BOOKLORE_BOOKS_DIR.rglob(f"*.{ext}"):
fname = filepath.stem.lower()
# Check if title words appear in filename
matches = sum(1 for w in title_words if w in fname)
score = matches / len(title_words) if title_words else 0
# Prefer epub > pdf > mobi > azw3
ext_bonus = {"epub": 0.1, "pdf": 0.05, "mobi": 0.03, "azw3": 0.02}.get(ext, 0)
score += ext_bonus
if score > best_score:
best_score = score
best_match = filepath
if best_match and best_score >= 0.5:
return best_match, meta
return None, meta
def _handle_send_to_kindle(self, book_id: str, body: bytes):
"""Send a book file to a Kindle email via SMTP2GO API."""
if not SMTP2GO_API_KEY or not SMTP2GO_FROM_EMAIL:
self._send_json({"error": "Email not configured"}, 502)
return
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
target = data.get("target", "1")
kindle_email = KINDLE_EMAIL_1 if target == "1" else KINDLE_EMAIL_2
if not kindle_email:
self._send_json({"error": f"Kindle target {target} not configured"}, 400)
return
# Find the book file
file_path, meta = self._find_book_file(book_id)
if not file_path or not file_path.exists():
self._send_json({"error": "Book file not found on disk"}, 404)
return
title = meta.get("title", "Book") if meta else "Book"
author = ", ".join(meta.get("authors", [])) if meta else ""
# Read file and encode as base64
import base64
file_data = file_path.read_bytes()
file_b64 = base64.b64encode(file_data).decode("ascii")
filename = file_path.name
# Determine MIME type
ext = file_path.suffix.lower()
mime_map = {".epub": "application/epub+zip", ".pdf": "application/pdf", ".mobi": "application/x-mobipocket-ebook", ".azw3": "application/x-mobi8-ebook"}
mime_type = mime_map.get(ext, "application/octet-stream")
# Send via SMTP2GO API
email_payload = {
"api_key": SMTP2GO_API_KEY,
"sender": f"{SMTP2GO_FROM_NAME} <{SMTP2GO_FROM_EMAIL}>",
"to": [kindle_email],
"subject": f"{title}" + (f" - {author}" if author else ""),
"text_body": f"Sent from Platform: {title}" + (f" by {author}" if author else ""),
"attachments": [{
"filename": filename,
"fileblob": file_b64,
"mimetype": mime_type,
}]
}
try:
req_body = json.dumps(email_payload).encode("utf-8")
req = urllib.request.Request(
"https://api.smtp2go.com/v3/email/send",
data=req_body,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
if result.get("data", {}).get("succeeded", 0) > 0:
self._send_json({
"success": True,
"title": title,
"sentTo": kindle_email,
"format": ext.lstrip(".").upper(),
"size": len(file_data),
})
else:
self._send_json({"error": "Email send failed", "detail": result}, 500)
except Exception as e:
self._send_json({"error": f"SMTP2GO error: {str(e)}"}, 500)
def _handle_send_file_to_kindle(self, body: bytes):
"""Send a downloaded file to Kindle by filename from bookdrop directory."""
if not SMTP2GO_API_KEY or not SMTP2GO_FROM_EMAIL:
self._send_json({"error": "Email not configured"}, 502)
return
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
filename = data.get("filename", "")
target = data.get("target", "1")
title = data.get("title", filename)
kindle_email = KINDLE_EMAIL_1 if target == "1" else KINDLE_EMAIL_2
if not kindle_email:
self._send_json({"error": f"Kindle target {target} not configured"}, 400)
return
# Find file in bookdrop or booklore-books
file_path = None
for search_dir in [BOOKDROP_DIR, BOOKLORE_BOOKS_DIR]:
if not search_dir.exists():
continue
for fp in search_dir.rglob("*"):
if fp.is_file() and fp.name == filename:
file_path = fp
break
if file_path:
break
if not file_path or not file_path.exists():
self._send_json({"error": f"File not found: {filename}"}, 404)
return
import base64
file_data = file_path.read_bytes()
file_b64 = base64.b64encode(file_data).decode("ascii")
ext = file_path.suffix.lower()
mime_map = {".epub": "application/epub+zip", ".pdf": "application/pdf", ".mobi": "application/x-mobipocket-ebook", ".azw3": "application/x-mobi8-ebook"}
mime_type = mime_map.get(ext, "application/octet-stream")
email_payload = {
"api_key": SMTP2GO_API_KEY,
"sender": f"{SMTP2GO_FROM_NAME} <{SMTP2GO_FROM_EMAIL}>",
"to": [kindle_email],
"subject": title,
"text_body": f"Sent from Platform: {title}",
"attachments": [{"filename": filename, "fileblob": file_b64, "mimetype": mime_type}]
}
try:
req_body = json.dumps(email_payload).encode("utf-8")
req = urllib.request.Request("https://api.smtp2go.com/v3/email/send", data=req_body, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
if result.get("data", {}).get("succeeded", 0) > 0:
self._send_json({"success": True, "title": title, "sentTo": kindle_email, "format": ext.lstrip(".").upper()})
else:
self._send_json({"error": "Email send failed", "detail": result}, 500)
except Exception as e:
self._send_json({"error": f"SMTP2GO error: {str(e)}"}, 500)
def _handle_karakeep_save(self, body):
"""Save a URL to Karakeep as a bookmark."""
if not KARAKEEP_API_KEY:
self._send_json({"error": "Karakeep not configured"}, 502)
return
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
url = data.get("url", "")
if not url:
self._send_json({"error": "Missing url"}, 400)
return
payload = json.dumps({"type": "link", "url": url}).encode()
headers = {
"Authorization": f"Bearer {KARAKEEP_API_KEY}",
"Content-Type": "application/json",
}
try:
status, _, resp = proxy_request(
f"{KARAKEEP_URL}/api/v1/bookmarks", "POST",
headers, payload, timeout=15
)
if status in (200, 201):
result = json.loads(resp)
self._send_json({"ok": True, "id": result.get("id", "")})
else:
print(f"[Karakeep] Save failed ({status}): {resp[:200]}")
self._send_json({"error": "Failed to save", "status": status}, status)
except Exception as e:
print(f"[Karakeep] Error: {e}")
self._send_json({"error": str(e)}, 500)
def _handle_karakeep_delete(self, body):
"""Delete a bookmark from Karakeep."""
if not KARAKEEP_API_KEY:
self._send_json({"error": "Karakeep not configured"}, 502)
return
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
bookmark_id = data.get("id", "")
if not bookmark_id:
self._send_json({"error": "Missing id"}, 400)
return
headers = {
"Authorization": f"Bearer {KARAKEEP_API_KEY}",
}
try:
status, _, resp = proxy_request(
f"{KARAKEEP_URL}/api/v1/bookmarks/{bookmark_id}", "DELETE",
headers, timeout=10
)
if status in (200, 204):
self._send_json({"ok": True})
else:
self._send_json({"error": "Delete failed", "status": status}, status)
except Exception as e:
print(f"[Karakeep] Delete error: {e}")
self._send_json({"error": str(e)}, 500)
def _handle_image_proxy(self):
"""Proxy external images to bypass hotlink protection (e.g. Reddit)."""
qs = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(qs)
url = params.get("url", [None])[0]
if not url:
self._send_json({"error": "Missing url parameter"}, 400)
return
try:
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (compatible; PlatformProxy/1.0)",
"Accept": "image/*,*/*",
"Referer": urllib.parse.urlparse(url).scheme + "://" + urllib.parse.urlparse(url).netloc + "/",
})
resp = urllib.request.urlopen(req, timeout=10, context=_ssl_ctx)
body = resp.read()
ct = resp.headers.get("Content-Type", "image/jpeg")
self.send_response(200)
self.send_header("Content-Type", ct)
self.send_header("Content-Length", len(body))
self.send_header("Cache-Control", "public, max-age=86400")
self.end_headers()
self.wfile.write(body)
except Exception as e:
print(f"[ImageProxy] Error fetching {url}: {e}")
self._send_json({"error": "Failed to fetch image"}, 502)
def _handle_get_pinned(self, user):
conn = get_db()
rows = conn.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn.close()
self._send_json({"pinned": [dict(r) for r in rows]})
def _handle_dashboard(self, user):
"""Aggregate dashboard data from connected services — all fetches in parallel."""
from concurrent.futures import ThreadPoolExecutor, as_completed
conn = get_db()
apps = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
conn.close()
SERVICE_LEVEL_AUTH = {"inventory", "reader", "books", "music", "budget"}
widgets = []
futures = {}
def fetch_trips(app, headers):
target = app["proxy_target"]
widget_data = None
s1, _, b1 = proxy_request(f"{target}/api/trips", "GET", headers, timeout=10)
s2, _, b2 = proxy_request(f"{target}/api/stats", "GET", headers, timeout=10)
if s1 == 200:
trips = json.loads(b1).get("trips", [])
now = datetime.now().strftime("%Y-%m-%d")
active = [t for t in trips if t.get("start_date", "") <= now <= t.get("end_date", "")]
upcoming = sorted(
[t for t in trips if t.get("start_date", "") > now],
key=lambda t: t.get("start_date", "")
)[:3]
widget_data = {"active": active[:1], "upcoming": upcoming}
if s2 == 200:
widget_data["stats"] = json.loads(b2)
return widget_data
def fetch_fitness(app, headers):
target = app["proxy_target"]
today = datetime.now().strftime("%Y-%m-%d")
s1, _, b1 = proxy_request(f"{target}/api/users", "GET", headers, timeout=10)
if s1 != 200:
return None
users = json.loads(b1)
# Fetch all user data in parallel
def get_user_data(u):
s2, _, b2 = proxy_request(f"{target}/api/entries/totals?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
s3, _, b3 = proxy_request(f"{target}/api/goals/for-date?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
return {
"user": u,
"totals": json.loads(b2) if s2 == 200 else None,
"goal": json.loads(b3) if s3 == 200 else None
}
with ThreadPoolExecutor(max_workers=4) as inner:
people = list(inner.map(get_user_data, users[:4]))
return {"people": people, "date": today}
def fetch_inventory(app):
target = app["proxy_target"]
s1, _, b1 = proxy_request(f"{target}/needs-review-count", "GET", {}, timeout=10)
return json.loads(b1) if s1 == 200 else None
def fetch_budget(app):
try:
s, _, b = proxy_request(f"{app['proxy_target']}/summary", "GET", {}, timeout=8)
if s == 200:
data = json.loads(b)
return {"count": data.get("transactionCount", 0), "totalBalance": data.get("totalBalanceDollars", 0), "spending": data.get("spendingDollars", 0), "income": data.get("incomeDollars", 0), "topCategories": data.get("topCategories", [])[:5], "month": data.get("month", "")}
except:
pass
return None
def fetch_reader(app):
target = app["proxy_target"]
hdrs = {"X-Auth-Token": MINIFLUX_API_KEY} if MINIFLUX_API_KEY else {}
s1, _, b1 = proxy_request(f"{target}/v1/feeds/counters", "GET", hdrs, timeout=10)
if s1 != 200:
return None
counters = json.loads(b1)
total_unread = sum(counters.get("unreads", {}).values())
entries = []
s2, _, b2 = proxy_request(
f"{target}/v1/entries?status=unread&direction=desc&order=published_at&limit=10",
"GET",
hdrs,
timeout=10
)
if s2 == 200:
payload = json.loads(b2)
for entry in payload.get("entries", [])[:10]:
feed = entry.get("feed") or {}
entries.append({
"id": entry.get("id"),
"title": entry.get("title") or "Untitled article",
"url": entry.get("url"),
"published_at": entry.get("published_at"),
"author": entry.get("author") or "",
"reading_time": entry.get("reading_time") or 0,
"feed": {
"id": feed.get("id"),
"title": feed.get("title") or "Feed"
}
})
return {"unread": total_unread, "articles": entries}
# Launch all widget fetches in parallel
with ThreadPoolExecutor(max_workers=3) as executor:
for app in apps:
app = dict(app)
svc_token = get_service_token(user["id"], app["id"])
is_service_level = app["id"] in SERVICE_LEVEL_AUTH
if not svc_token and not is_service_level:
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": False, "data": None})
continue
headers = {"Authorization": f"Bearer {svc_token['auth_token']}"} if svc_token else {}
if not headers and app["id"] == "trips" and TRIPS_API_TOKEN:
headers = {"Authorization": f"Bearer {TRIPS_API_TOKEN}"}
if app["dashboard_widget"] == "upcoming_trips":
futures[executor.submit(fetch_trips, app, headers)] = app
elif app["dashboard_widget"] == "daily_calories":
futures[executor.submit(fetch_fitness, app, headers)] = app
elif app["dashboard_widget"] == "items_issues":
futures[executor.submit(fetch_inventory, app)] = app
elif app["dashboard_widget"] == "unread_count":
futures[executor.submit(fetch_reader, app)] = app
elif app["dashboard_widget"] == "budget_summary":
futures[executor.submit(fetch_budget, app)] = app
for future in as_completed(futures):
app = futures[future]
try:
widget_data = future.result()
except Exception as e:
print(f"[Dashboard] Error fetching {app['id']}: {e}")
widget_data = None
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": True, "data": widget_data})
# Sort by original app order
app_order = {dict(a)["id"]: dict(a)["sort_order"] for a in apps}
widgets.sort(key=lambda w: app_order.get(w["app"], 99))
# Include pinned items
conn2 = get_db()
pinned_rows = conn2.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn2.close()
pinned = [dict(r) for r in pinned_rows]
self._send_json({"widgets": widgets, "pinned": pinned})
# ── Command bar ──
def _handle_command(self, user, body):
"""Parse natural language command and execute it against the right service."""
try:
data = json.loads(body)
except:
self._send_json({"error": "Invalid JSON"}, 400)
return
command = data.get("command", "").strip()
if not command:
self._send_json({"error": "No command provided"}, 400)
return
if not OPENAI_API_KEY:
self._send_json({"error": "AI not configured"}, 500)
return
# Get context: user's trips list and today's date
trips_token = get_service_token(user["id"], "trips")
fitness_token = get_service_token(user["id"], "fitness")
trips_context = ""
if trips_token:
s, _, b = proxy_request(f"{TRIPS_URL}/api/trips", "GET",
{"Authorization": f"Bearer {trips_token['auth_token']}"}, timeout=5)
if s == 200:
trips_list = json.loads(b).get("trips", [])
trips_context = "Available trips: " + ", ".join(
f'"{t["name"]}" (id={t["id"]}, {t.get("start_date","")} to {t.get("end_date","")})' for t in trips_list
)
today = datetime.now().strftime("%Y-%m-%d")
system_prompt = f"""You are a command executor for a personal platform with two services: Trips and Fitness.
Today's date: {today}
Current user: {user["display_name"]} (id={user["id"]})
{trips_context}
Parse the user's natural language command and return a JSON action to execute.
Return ONLY valid JSON with this structure:
{{
"service": "trips" | "fitness",
"action": "description of what was done",
"api_call": {{
"method": "POST" | "GET",
"path": "/api/...",
"body": {{ ... }}
}}
}}
AVAILABLE ACTIONS:
For Trips service:
- Add location/activity: POST /api/location with {{"trip_id": "...", "name": "...", "category": "attraction|restaurant|cafe|hike|shopping", "visit_date": "YYYY-MM-DD", "start_time": "YYYY-MM-DDTHH:MM:SS", "description": "..."}}
- Add lodging: POST /api/lodging with {{"trip_id": "...", "name": "...", "type": "hotel", "check_in": "YYYY-MM-DDTHH:MM:SS", "check_out": "YYYY-MM-DDTHH:MM:SS", "location": "..."}}
- Add transportation: POST /api/transportation with {{"trip_id": "...", "name": "...", "type": "plane|car|train", "from_location": "...", "to_location": "...", "date": "YYYY-MM-DDTHH:MM:SS", "flight_number": "..."}}
- Add note: POST /api/note with {{"trip_id": "...", "name": "...", "content": "...", "date": "YYYY-MM-DD"}}
- Create trip: POST /api/trip with {{"name": "...", "start_date": "YYYY-MM-DD", "end_date": "YYYY-MM-DD", "description": "..."}}
For Fitness service:
- Log food (quick add): POST /api/entries with {{"entry_type": "quick_add", "meal_type": "breakfast|lunch|dinner|snack", "snapshot_food_name": "...", "snapshot_quantity": number, "snapshot_calories": number, "snapshot_protein": number, "snapshot_carbs": number, "snapshot_fat": number, "date": "YYYY-MM-DD"}}
IMPORTANT for snapshot_food_name: use just the FOOD NAME without the quantity (e.g. "mini cinnabon" not "1/2 mini cinnabon"). Put the numeric quantity in snapshot_quantity (e.g. 0.5 for "half", 0.75 for "3/4").
- Search food first then log: If you know exact nutrition, use quick_add. Common foods: banana (105cal, 1g protein, 27g carbs, 0g fat), apple (95cal), egg (78cal, 6g protein, 1g carbs, 5g fat), chicken breast (165cal, 31g protein, 0g carbs, 3.6g fat), rice 1 cup (206cal, 4g protein, 45g carbs, 0g fat), bread slice (79cal, 3g protein, 15g carbs, 1g fat), milk 1 cup (149cal, 8g protein, 12g carbs, 8g fat), coffee black (2cal), oatmeal 1 cup (154cal, 5g protein, 27g carbs, 3g fat).
For Inventory service (searches NocoDB):
- Search items: GET /search-records?q=... (note: NO /api prefix for inventory)
- If user asks to search inventory, find items, look up orders, check serial numbers use service "inventory" with GET method
Guidelines:
- For food logging, default meal_type to "snack" if not specified
- For food logging, use today's date if not specified
- For trips, match the trip name fuzzy (e.g. "colorado" matches "Colorado")
- Use the trip_id from the available trips list
- Default check-in to 3PM, check-out to 11AM for hotels
- Estimate reasonable nutrition values for foods you know
- For inventory searches, use service "inventory" with method GET and path /search-records?q=searchterm
- IMPORTANT: Inventory paths do NOT start with /api/ use bare paths like /search-records"""
# Call OpenAI
try:
ai_body = json.dumps({
"model": OPENAI_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": command}
],
"max_completion_tokens": 1000,
"temperature": 0.1
}).encode()
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=ai_body,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
},
method="POST"
)
with urllib.request.urlopen(req, context=_ssl_ctx, timeout=30) as resp:
ai_result = json.loads(resp.read().decode())
ai_text = ai_result["choices"][0]["message"]["content"]
# Parse AI response
ai_text = ai_text.strip()
if ai_text.startswith("```json"):
ai_text = ai_text[7:]
if ai_text.startswith("```"):
ai_text = ai_text[3:]
if ai_text.endswith("```"):
ai_text = ai_text[:-3]
parsed = json.loads(ai_text.strip())
except Exception as e:
self._send_json({"error": f"AI parsing failed: {e}"}, 500)
return
# Execute the action
service = parsed.get("service", "")
api_call = parsed.get("api_call", {})
action_desc = parsed.get("action", "Unknown action")
if not api_call or not api_call.get("path"):
self._send_json({"error": "AI could not determine action", "parsed": parsed}, 400)
return
# Get service token
svc_token = get_service_token(user["id"], service)
if not svc_token and service not in ("inventory",):
self._send_json({"error": f"{service} service not connected"}, 400)
return
# Build request to service
target = SERVICE_MAP.get(service)
if not target:
self._send_json({"error": f"Unknown service: {service}"}, 400)
return
method = api_call.get("method", "POST")
path = api_call.get("path", "")
body_data = api_call.get("body", {})
headers = {"Content-Type": "application/json"}
if svc_token:
headers["Authorization"] = f"Bearer {svc_token['auth_token']}"
# For fitness food logging, use the resolve workflow instead of quick_add
if service == "fitness" and body_data.get("entry_type") == "quick_add":
food_name = body_data.get("snapshot_food_name", "")
meal_type = body_data.get("meal_type", "snack")
entry_date = body_data.get("date", datetime.now().strftime("%Y-%m-%d"))
# AI-parsed quantity from the command (more reliable than resolve engine's parser)
ai_quantity = body_data.get("snapshot_quantity")
if food_name and svc_token:
try:
# Step 1: Resolve the food through the smart resolution engine
resolve_body = json.dumps({
"raw_phrase": food_name,
"meal_type": meal_type,
"entry_date": entry_date,
"source": "command_bar"
}).encode()
rs, _, rb = proxy_request(
f"{target}/api/foods/resolve", "POST",
{**headers, "Content-Type": "application/json"}, resolve_body, timeout=15
)
if rs == 200:
resolved = json.loads(rb)
res_type = resolved.get("resolution_type")
matched = resolved.get("matched_food")
parsed_food = resolved.get("parsed", {})
if matched and res_type in ("matched", "ai_estimated", "confirm"):
food_id = matched.get("id")
# For new foods (ai_estimated): serving was created for the exact request
# e.g. "3/4 cup biryani" → serving = "3/4 cup" at 300 cal → qty = 1.0
# For existing foods (matched/confirm): use AI quantity as multiplier
# e.g. "half cinnabon" → existing "1 mini cinnabon" at 350 cal → qty = 0.5
if res_type == "ai_estimated":
quantity = 1.0
elif ai_quantity and ai_quantity != 1:
quantity = ai_quantity
else:
quantity = 1.0
body_data = {
"food_id": food_id,
"meal_type": meal_type,
"quantity": quantity,
"date": entry_date,
"entry_type": "food"
}
# Use matching serving if available
servings = matched.get("servings", [])
if servings:
body_data["serving_id"] = servings[0].get("id")
# Use snapshot name override if provided (includes modifiers)
if resolved.get("snapshot_name_override"):
body_data["snapshot_food_name_override"] = resolved["snapshot_name_override"]
if resolved.get("note"):
body_data["note"] = resolved["note"]
action_desc = f"Logged {matched.get('name', food_name)} for {meal_type}"
if res_type == "ai_estimated":
action_desc += " (AI estimated, added to food database)"
# Auto-fetch image if food doesn't have one
if not matched.get("image_path") and food_id:
try:
img_body = json.dumps({"query": matched.get("name", food_name) + " food"}).encode()
si, _, sb = proxy_request(
f"{target}/api/images/search", "POST",
{**headers, "Content-Type": "application/json"}, img_body, timeout=8
)
if si == 200:
imgs = json.loads(sb).get("images", [])
if imgs:
img_url = imgs[0].get("url") or imgs[0].get("thumbnail")
if img_url:
proxy_request(
f"{target}/api/foods/{food_id}/image", "POST",
{**headers, "Content-Type": "application/json"},
json.dumps({"url": img_url}).encode(), timeout=10
)
except Exception as e:
print(f"[Command] Auto-image fetch failed: {e}")
except Exception as e:
print(f"[Command] Food resolve failed, falling back to quick_add: {e}")
req_body = json.dumps(body_data).encode() if body_data else None
status, resp_headers, resp_body = proxy_request(f"{target}{path}", method, headers, req_body, timeout=15)
try:
result = json.loads(resp_body)
except:
result = {"raw": resp_body.decode()[:200]}
self._send_json({
"success": status < 400,
"action": action_desc,
"service": service,
"status": status,
"result": result
})
# ── Service proxy ──
def _proxy(self, method, path, body=None):
user = self._get_user()
service_id, target, backend_path = resolve_service(path)
if not target:
self._send_json({"error": "Unknown service"}, 404)
return
# Build headers for downstream
headers = {}
ct = self.headers.get("Content-Type")
if ct:
headers["Content-Type"] = ct
# Inject service auth token if user is authenticated
if service_id == "reader" and MINIFLUX_API_KEY:
headers["X-Auth-Token"] = MINIFLUX_API_KEY
elif service_id == "trips" and TRIPS_API_TOKEN:
headers["Authorization"] = f"Bearer {TRIPS_API_TOKEN}"
elif user:
svc_token = get_service_token(user["id"], service_id)
if svc_token:
if svc_token["auth_type"] == "bearer":
headers["Authorization"] = f"Bearer {svc_token['auth_token']}"
elif svc_token["auth_type"] == "cookie":
headers["Cookie"] = f"session={svc_token['auth_token']}"
# Forward original auth if no service token (allows direct API key usage)
if "Authorization" not in headers:
auth = self.headers.get("Authorization")
if auth:
headers["Authorization"] = auth
# Forward other useful headers
for h in ["X-API-Key", "X-Telegram-User-Id"]:
val = self.headers.get(h)
if val:
headers[h] = val
# Build full URL
query = self.path.split("?", 1)[1] if "?" in self.path else ""
full_url = f"{target}{backend_path}"
if query:
full_url += f"?{query}"
# Proxy the request
status, resp_headers, resp_body = proxy_request(full_url, method, headers, body)
# Send response
self.send_response(status)
for k, v in resp_headers.items():
k_lower = k.lower()
if k_lower in ("content-type", "content-disposition"):
self.send_header(k, v)
self.send_header("Content-Length", len(resp_body))
self.end_headers()
self.wfile.write(resp_body)
# ── Main ──
def main():
init_db()
load_service_map()
print(f"[Gateway] Services: {SERVICE_MAP}")
print(f"[Gateway] Listening on port {PORT}")
server = HTTPServer(("0.0.0.0", PORT), GatewayHandler)
server.serve_forever()
if __name__ == "__main__":
main()