- Switch HTTPServer to ThreadingHTTPServer (concurrent request handling) - Replace SHA-256 password hashing with bcrypt (auth.py, database.py) - Add bcrypt to Dockerfile - Move qBittorrent env vars to config.py - Move _booklore_token state out of config into booklore.py - Remove dead fitness_token variable in command.py - Fix OpenAI call to use default SSL context instead of no-verify ctx - Log swallowed budget fetch error in dashboard.py
313 lines
12 KiB
Python
313 lines
12 KiB
Python
"""
|
|
Platform Gateway — Dashboard, apps, user profile, connections, pinning handlers.
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from config import TRIPS_API_TOKEN, MINIFLUX_API_KEY
|
|
from database import get_db
|
|
from sessions import get_service_token, set_service_token, delete_service_token
|
|
import proxy as _proxy_module
|
|
from proxy import proxy_request
|
|
|
|
|
|
def handle_apps(handler, user):
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
|
|
|
|
# Check which services the user has connected
|
|
connections = conn.execute("SELECT service FROM service_connections WHERE user_id = ?",
|
|
(user["id"],)).fetchall()
|
|
connected = {r["service"] for r in connections}
|
|
conn.close()
|
|
|
|
apps = []
|
|
for row in rows:
|
|
app = dict(row)
|
|
app["connected"] = app["id"] in connected
|
|
del app["proxy_target"] # don't expose internal URLs
|
|
apps.append(app)
|
|
|
|
handler._send_json({"apps": apps})
|
|
|
|
|
|
def handle_me(handler, user):
|
|
conn = get_db()
|
|
connections = conn.execute(
|
|
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
|
|
(user["id"],)
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
handler._send_json({
|
|
"user": {
|
|
"id": user["id"],
|
|
"username": user["username"],
|
|
"display_name": user["display_name"],
|
|
"created_at": user["created_at"]
|
|
},
|
|
"connections": [dict(c) for c in connections]
|
|
})
|
|
|
|
|
|
def handle_connections(handler, user):
|
|
conn = get_db()
|
|
connections = conn.execute(
|
|
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
|
|
(user["id"],)
|
|
).fetchall()
|
|
conn.close()
|
|
handler._send_json({"connections": [dict(c) for c in connections]})
|
|
|
|
|
|
def handle_set_connection(handler, user, body):
|
|
try:
|
|
data = json.loads(body)
|
|
except Exception as e:
|
|
handler._send_json({"error": "Invalid JSON"}, 400)
|
|
return
|
|
|
|
service = data.get("service", "")
|
|
action = data.get("action", "connect") # connect or disconnect
|
|
|
|
if action == "disconnect":
|
|
delete_service_token(user["id"], service)
|
|
handler._send_json({"success": True})
|
|
return
|
|
|
|
auth_token = data.get("token", "")
|
|
auth_type = data.get("auth_type", "bearer")
|
|
|
|
if not service or not auth_token:
|
|
handler._send_json({"error": "service and token required"}, 400)
|
|
return
|
|
|
|
# Validate token against the service
|
|
target = _proxy_module.SERVICE_MAP.get(service)
|
|
if not target:
|
|
handler._send_json({"error": f"Unknown service: {service}"}, 400)
|
|
return
|
|
|
|
# Test the token
|
|
if service == "trips":
|
|
test_url = f"{target}/api/trips"
|
|
headers = {"Authorization": f"Bearer {auth_token}"}
|
|
elif service == "fitness":
|
|
test_url = f"{target}/api/users"
|
|
headers = {"Authorization": f"Bearer {auth_token}"}
|
|
else:
|
|
test_url = f"{target}/api/health"
|
|
headers = {"Authorization": f"Bearer {auth_token}"}
|
|
|
|
status, _, _ = proxy_request(test_url, "GET", headers, timeout=10)
|
|
if status == 401:
|
|
handler._send_json({"error": "Invalid token for this service"}, 401)
|
|
return
|
|
|
|
set_service_token(user["id"], service, auth_token, auth_type)
|
|
handler._send_json({"success": True})
|
|
|
|
|
|
def handle_pin(handler, user, body):
|
|
try:
|
|
data = json.loads(body)
|
|
except Exception as e:
|
|
handler._send_json({"error": "Invalid JSON"}, 400)
|
|
return
|
|
item_id = str(data.get("item_id", ""))
|
|
item_name = data.get("item_name", "")
|
|
service = data.get("service", "inventory")
|
|
if not item_id:
|
|
handler._send_json({"error": "item_id required"}, 400)
|
|
return
|
|
conn = get_db()
|
|
conn.execute("""
|
|
INSERT INTO pinned_items (user_id, service, item_id, item_name)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(user_id, service, item_id) DO UPDATE SET item_name = ?, pinned_at = CURRENT_TIMESTAMP
|
|
""", (user["id"], service, item_id, item_name, item_name))
|
|
conn.commit()
|
|
conn.close()
|
|
handler._send_json({"success": True})
|
|
|
|
|
|
def handle_unpin(handler, user, body):
|
|
try:
|
|
data = json.loads(body)
|
|
except Exception as e:
|
|
handler._send_json({"error": "Invalid JSON"}, 400)
|
|
return
|
|
item_id = str(data.get("item_id", ""))
|
|
service = data.get("service", "inventory")
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM pinned_items WHERE user_id = ? AND service = ? AND item_id = ?",
|
|
(user["id"], service, item_id))
|
|
conn.commit()
|
|
conn.close()
|
|
handler._send_json({"success": True})
|
|
|
|
|
|
def handle_get_pinned(handler, user):
|
|
conn = get_db()
|
|
rows = conn.execute(
|
|
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
|
|
(user["id"],)
|
|
).fetchall()
|
|
conn.close()
|
|
handler._send_json({"pinned": [dict(r) for r in rows]})
|
|
|
|
|
|
def handle_dashboard(handler, user):
|
|
"""Aggregate dashboard data from connected services -- all fetches in parallel."""
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
conn = get_db()
|
|
apps = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
|
|
conn.close()
|
|
|
|
SERVICE_LEVEL_AUTH = {"inventory", "reader", "books", "music", "budget"}
|
|
widgets = []
|
|
futures = {}
|
|
|
|
def fetch_trips(app, headers):
|
|
target = app["proxy_target"]
|
|
widget_data = None
|
|
s1, _, b1 = proxy_request(f"{target}/api/trips", "GET", headers, timeout=10)
|
|
s2, _, b2 = proxy_request(f"{target}/api/stats", "GET", headers, timeout=10)
|
|
if s1 == 200:
|
|
trips = json.loads(b1).get("trips", [])
|
|
now = datetime.now().strftime("%Y-%m-%d")
|
|
active = [t for t in trips if t.get("start_date", "") <= now <= t.get("end_date", "")]
|
|
upcoming = sorted(
|
|
[t for t in trips if t.get("start_date", "") > now],
|
|
key=lambda t: t.get("start_date", "")
|
|
)[:3]
|
|
widget_data = {"active": active[:1], "upcoming": upcoming}
|
|
if s2 == 200:
|
|
widget_data["stats"] = json.loads(b2)
|
|
return widget_data
|
|
|
|
def fetch_fitness(app, headers):
|
|
target = app["proxy_target"]
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
s1, _, b1 = proxy_request(f"{target}/api/users", "GET", headers, timeout=10)
|
|
if s1 != 200:
|
|
return None
|
|
users = json.loads(b1)
|
|
|
|
# Fetch all user data in parallel
|
|
def get_user_data(u):
|
|
s2, _, b2 = proxy_request(f"{target}/api/entries/totals?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
|
|
s3, _, b3 = proxy_request(f"{target}/api/goals/for-date?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
|
|
return {
|
|
"user": u,
|
|
"totals": json.loads(b2) if s2 == 200 else None,
|
|
"goal": json.loads(b3) if s3 == 200 else None
|
|
}
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as inner:
|
|
people = list(inner.map(get_user_data, users[:4]))
|
|
return {"people": people, "date": today}
|
|
|
|
def fetch_inventory(app):
|
|
target = app["proxy_target"]
|
|
s1, _, b1 = proxy_request(f"{target}/needs-review-count", "GET", {}, timeout=10)
|
|
return json.loads(b1) if s1 == 200 else None
|
|
|
|
def fetch_budget(app):
|
|
try:
|
|
s, _, b = proxy_request(f"{app['proxy_target']}/summary", "GET", {}, timeout=8)
|
|
if s == 200:
|
|
data = json.loads(b)
|
|
return {"count": data.get("transactionCount", 0), "totalBalance": data.get("totalBalanceDollars", 0), "spending": data.get("spendingDollars", 0), "income": data.get("incomeDollars", 0), "topCategories": data.get("topCategories", [])[:5], "month": data.get("month", "")}
|
|
except Exception as e:
|
|
print(f"[Dashboard] Budget fetch error: {e}")
|
|
return None
|
|
|
|
def fetch_reader(app):
|
|
target = app["proxy_target"]
|
|
hdrs = {"X-Auth-Token": MINIFLUX_API_KEY} if MINIFLUX_API_KEY else {}
|
|
s1, _, b1 = proxy_request(f"{target}/v1/feeds/counters", "GET", hdrs, timeout=10)
|
|
if s1 != 200:
|
|
return None
|
|
|
|
counters = json.loads(b1)
|
|
total_unread = sum(counters.get("unreads", {}).values())
|
|
|
|
entries = []
|
|
s2, _, b2 = proxy_request(
|
|
f"{target}/v1/entries?status=unread&direction=desc&order=published_at&limit=10",
|
|
"GET",
|
|
hdrs,
|
|
timeout=10
|
|
)
|
|
if s2 == 200:
|
|
payload = json.loads(b2)
|
|
for entry in payload.get("entries", [])[:10]:
|
|
feed = entry.get("feed") or {}
|
|
entries.append({
|
|
"id": entry.get("id"),
|
|
"title": entry.get("title") or "Untitled article",
|
|
"url": entry.get("url"),
|
|
"published_at": entry.get("published_at"),
|
|
"author": entry.get("author") or "",
|
|
"reading_time": entry.get("reading_time") or 0,
|
|
"feed": {
|
|
"id": feed.get("id"),
|
|
"title": feed.get("title") or "Feed"
|
|
}
|
|
})
|
|
|
|
return {"unread": total_unread, "articles": entries}
|
|
|
|
# Launch all widget fetches in parallel
|
|
with ThreadPoolExecutor(max_workers=3) as executor:
|
|
for app in apps:
|
|
app = dict(app)
|
|
svc_token = get_service_token(user["id"], app["id"])
|
|
is_service_level = app["id"] in SERVICE_LEVEL_AUTH
|
|
|
|
if not svc_token and not is_service_level:
|
|
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": False, "data": None})
|
|
continue
|
|
|
|
headers = {"Authorization": f"Bearer {svc_token['auth_token']}"} if svc_token else {}
|
|
if not headers and app["id"] == "trips" and TRIPS_API_TOKEN:
|
|
headers = {"Authorization": f"Bearer {TRIPS_API_TOKEN}"}
|
|
|
|
if app["dashboard_widget"] == "upcoming_trips":
|
|
futures[executor.submit(fetch_trips, app, headers)] = app
|
|
elif app["dashboard_widget"] == "daily_calories":
|
|
futures[executor.submit(fetch_fitness, app, headers)] = app
|
|
elif app["dashboard_widget"] == "items_issues":
|
|
futures[executor.submit(fetch_inventory, app)] = app
|
|
elif app["dashboard_widget"] == "unread_count":
|
|
futures[executor.submit(fetch_reader, app)] = app
|
|
elif app["dashboard_widget"] == "budget_summary":
|
|
futures[executor.submit(fetch_budget, app)] = app
|
|
|
|
for future in as_completed(futures):
|
|
app = futures[future]
|
|
try:
|
|
widget_data = future.result()
|
|
except Exception as e:
|
|
print(f"[Dashboard] Error fetching {app['id']}: {e}")
|
|
widget_data = None
|
|
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": True, "data": widget_data})
|
|
|
|
# Sort by original app order
|
|
app_order = {dict(a)["id"]: dict(a)["sort_order"] for a in apps}
|
|
widgets.sort(key=lambda w: app_order.get(w["app"], 99))
|
|
|
|
# Include pinned items
|
|
conn2 = get_db()
|
|
pinned_rows = conn2.execute(
|
|
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
|
|
(user["id"],)
|
|
).fetchall()
|
|
conn2.close()
|
|
pinned = [dict(r) for r in pinned_rows]
|
|
|
|
handler._send_json({"widgets": widgets, "pinned": pinned})
|