Files
platform/gateway/dashboard.py

340 lines
13 KiB
Python
Raw Normal View History

"""
Platform Gateway Dashboard, apps, user profile, connections, pinning handlers.
"""
import json
from datetime import datetime
from config import TRIPS_API_TOKEN, MINIFLUX_API_KEY, INVENTORY_SERVICE_API_KEY, BUDGET_SERVICE_API_KEY
from database import get_db
from sessions import get_service_token, set_service_token, delete_service_token
import proxy as _proxy_module
from proxy import proxy_request
def handle_apps(handler, user):
conn = get_db()
rows = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
# Check which services the user has connected
connections = conn.execute("SELECT service FROM service_connections WHERE user_id = ?",
(user["id"],)).fetchall()
connected = {r["service"] for r in connections}
conn.close()
apps = []
for row in rows:
app = dict(row)
app["connected"] = app["id"] in connected
del app["proxy_target"] # don't expose internal URLs
apps.append(app)
handler._send_json({"apps": apps})
def handle_me(handler, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({
"user": {
"id": user["id"],
"username": user["username"],
"display_name": user["display_name"],
"created_at": user["created_at"]
},
"connections": [dict(c) for c in connections]
})
def handle_connections(handler, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({"connections": [dict(c) for c in connections]})
def handle_set_connection(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
service = data.get("service", "")
action = data.get("action", "connect") # connect or disconnect
if action == "disconnect":
delete_service_token(user["id"], service)
handler._send_json({"success": True})
return
auth_token = data.get("token", "")
auth_type = data.get("auth_type", "bearer")
if not service or not auth_token:
handler._send_json({"error": "service and token required"}, 400)
return
# Validate token against the service
target = _proxy_module.SERVICE_MAP.get(service)
if not target:
handler._send_json({"error": f"Unknown service: {service}"}, 400)
return
# Validate token against a protected endpoint (not health check)
if service == "trips":
test_url = f"{target}/api/trips"
headers = {"Authorization": f"Bearer {auth_token}"}
elif service == "fitness":
test_url = f"{target}/api/user"
headers = {"Authorization": f"Bearer {auth_token}"}
elif service == "inventory":
test_url = f"{target}/summary"
headers = {"X-API-Key": auth_token}
elif service == "budget":
test_url = f"{target}/summary"
headers = {"X-API-Key": auth_token}
elif service == "reader":
test_url = f"{target}/v1/feeds/counters"
headers = {"X-Auth-Token": auth_token}
else:
# Unknown service — reject, don't fall back to health check
handler._send_json({"error": f"Cannot validate token for service: {service}"}, 400)
return
status, _, _ = proxy_request(test_url, "GET", headers, timeout=10)
if status == 401:
handler._send_json({"error": "Invalid token for this service"}, 401)
return
set_service_token(user["id"], service, auth_token, auth_type)
handler._send_json({"success": True})
def handle_pin(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
item_name = data.get("item_name", "")
service = data.get("service", "inventory")
if not item_id:
handler._send_json({"error": "item_id required"}, 400)
return
conn = get_db()
conn.execute("""
INSERT INTO pinned_items (user_id, service, item_id, item_name)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id, service, item_id) DO UPDATE SET item_name = ?, pinned_at = CURRENT_TIMESTAMP
""", (user["id"], service, item_id, item_name, item_name))
conn.commit()
conn.close()
handler._send_json({"success": True})
def handle_unpin(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
service = data.get("service", "inventory")
conn = get_db()
conn.execute("DELETE FROM pinned_items WHERE user_id = ? AND service = ? AND item_id = ?",
(user["id"], service, item_id))
conn.commit()
conn.close()
handler._send_json({"success": True})
def handle_get_pinned(handler, user):
conn = get_db()
rows = conn.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({"pinned": [dict(r) for r in rows]})
import time as _time
_dashboard_cache = {"data": None, "expires": 0, "user_id": None}
_DASHBOARD_TTL = 30 # seconds
def handle_dashboard(handler, user):
"""Aggregate dashboard data from connected services -- all fetches in parallel.
Results cached for 30 seconds per user to avoid repeated slow aggregation."""
# Return cached if fresh and same user
if (_dashboard_cache["data"] and _dashboard_cache["user_id"] == user["id"]
and _time.time() < _dashboard_cache["expires"]):
handler._send_json(_dashboard_cache["data"])
return
from concurrent.futures import ThreadPoolExecutor, as_completed
conn = get_db()
apps = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
conn.close()
SERVICE_LEVEL_AUTH = {"inventory", "reader", "books", "music", "budget"}
widgets = []
futures = {}
def fetch_trips(app, headers):
target = app["proxy_target"]
widget_data = None
s1, _, b1 = proxy_request(f"{target}/api/trips", "GET", headers, timeout=10)
s2, _, b2 = proxy_request(f"{target}/api/stats", "GET", headers, timeout=10)
if s1 == 200:
trips = json.loads(b1).get("trips", [])
now = datetime.now().strftime("%Y-%m-%d")
active = [t for t in trips if t.get("start_date", "") <= now <= t.get("end_date", "")]
upcoming = sorted(
[t for t in trips if t.get("start_date", "") > now],
key=lambda t: t.get("start_date", "")
)[:3]
widget_data = {"active": active[:1], "upcoming": upcoming}
if s2 == 200:
widget_data["stats"] = json.loads(b2)
return widget_data
def fetch_fitness(app, headers):
target = app["proxy_target"]
today = datetime.now().strftime("%Y-%m-%d")
s1, _, b1 = proxy_request(f"{target}/api/users", "GET", headers, timeout=10)
if s1 != 200:
return None
users = json.loads(b1)
# Fetch all user data in parallel
def get_user_data(u):
s2, _, b2 = proxy_request(f"{target}/api/entries/totals?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
s3, _, b3 = proxy_request(f"{target}/api/goals/for-date?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
return {
"user": u,
"totals": json.loads(b2) if s2 == 200 else None,
"goal": json.loads(b3) if s3 == 200 else None
}
with ThreadPoolExecutor(max_workers=4) as inner:
people = list(inner.map(get_user_data, users[:4]))
return {"people": people, "date": today}
def fetch_inventory(app):
target = app["proxy_target"]
hdrs = {"X-API-Key": INVENTORY_SERVICE_API_KEY} if INVENTORY_SERVICE_API_KEY else {}
s1, _, b1 = proxy_request(f"{target}/needs-review-count", "GET", hdrs, timeout=10)
return json.loads(b1) if s1 == 200 else None
def fetch_budget(app):
try:
hdrs = {"X-API-Key": BUDGET_SERVICE_API_KEY} if BUDGET_SERVICE_API_KEY else {}
s, _, b = proxy_request(f"{app['proxy_target']}/summary", "GET", hdrs, timeout=8)
if s == 200:
data = json.loads(b)
return {"count": data.get("transactionCount", 0), "totalBalance": data.get("totalBalanceDollars", 0), "spending": data.get("spendingDollars", 0), "income": data.get("incomeDollars", 0), "topCategories": data.get("topCategories", [])[:5], "month": data.get("month", "")}
except Exception as e:
print(f"[Dashboard] Budget fetch error: {e}")
return None
def fetch_reader(app):
target = app["proxy_target"]
hdrs = {"X-Auth-Token": MINIFLUX_API_KEY} if MINIFLUX_API_KEY else {}
s1, _, b1 = proxy_request(f"{target}/v1/feeds/counters", "GET", hdrs, timeout=10)
if s1 != 200:
return None
counters = json.loads(b1)
total_unread = sum(counters.get("unreads", {}).values())
entries = []
s2, _, b2 = proxy_request(
f"{target}/v1/entries?status=unread&direction=desc&order=published_at&limit=10",
"GET",
hdrs,
timeout=10
)
if s2 == 200:
payload = json.loads(b2)
for entry in payload.get("entries", [])[:10]:
feed = entry.get("feed") or {}
entries.append({
"id": entry.get("id"),
"title": entry.get("title") or "Untitled article",
"url": entry.get("url"),
"published_at": entry.get("published_at"),
"author": entry.get("author") or "",
"reading_time": entry.get("reading_time") or 0,
"feed": {
"id": feed.get("id"),
"title": feed.get("title") or "Feed"
}
})
return {"unread": total_unread, "articles": entries}
# Launch all widget fetches in parallel
with ThreadPoolExecutor(max_workers=3) as executor:
for app in apps:
app = dict(app)
svc_token = get_service_token(user["id"], app["id"])
is_service_level = app["id"] in SERVICE_LEVEL_AUTH
if not svc_token and not is_service_level:
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": False, "data": None})
continue
headers = {"Authorization": f"Bearer {svc_token['auth_token']}"} if svc_token else {}
if not headers and app["id"] == "trips" and TRIPS_API_TOKEN:
headers = {"Authorization": f"Bearer {TRIPS_API_TOKEN}"}
if app["dashboard_widget"] == "upcoming_trips":
futures[executor.submit(fetch_trips, app, headers)] = app
elif app["dashboard_widget"] == "daily_calories":
futures[executor.submit(fetch_fitness, app, headers)] = app
elif app["dashboard_widget"] == "items_issues":
futures[executor.submit(fetch_inventory, app)] = app
elif app["dashboard_widget"] == "unread_count":
futures[executor.submit(fetch_reader, app)] = app
elif app["dashboard_widget"] == "budget_summary":
futures[executor.submit(fetch_budget, app)] = app
for future in as_completed(futures):
app = futures[future]
try:
widget_data = future.result()
except Exception as e:
print(f"[Dashboard] Error fetching {app['id']}: {e}")
widget_data = None
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": True, "data": widget_data})
# Sort by original app order
app_order = {dict(a)["id"]: dict(a)["sort_order"] for a in apps}
widgets.sort(key=lambda w: app_order.get(w["app"], 99))
# Include pinned items
conn2 = get_db()
pinned_rows = conn2.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn2.close()
pinned = [dict(r) for r in pinned_rows]
result = {"widgets": widgets, "pinned": pinned}
_dashboard_cache["data"] = result
_dashboard_cache["expires"] = _time.time() + _DASHBOARD_TTL
_dashboard_cache["user_id"] = user["id"]
handler._send_json(result)