Files
platform/gateway/dashboard.py

315 lines
12 KiB
Python
Raw Permalink Normal View History

"""
Platform Gateway Dashboard, apps, user profile, connections, pinning handlers.
"""
import json
from datetime import datetime
from config import TRIPS_API_TOKEN, MINIFLUX_API_KEY, INVENTORY_SERVICE_API_KEY, BUDGET_SERVICE_API_KEY
from database import get_db
from sessions import get_service_token, set_service_token, delete_service_token
import proxy as _proxy_module
from proxy import proxy_request
def handle_apps(handler, user):
conn = get_db()
rows = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
# Check which services the user has connected
connections = conn.execute("SELECT service FROM service_connections WHERE user_id = ?",
(user["id"],)).fetchall()
connected = {r["service"] for r in connections}
conn.close()
apps = []
for row in rows:
app = dict(row)
app["connected"] = app["id"] in connected
del app["proxy_target"] # don't expose internal URLs
apps.append(app)
handler._send_json({"apps": apps})
def handle_me(handler, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({
"user": {
"id": user["id"],
"username": user["username"],
"display_name": user["display_name"],
"created_at": user["created_at"]
},
"connections": [dict(c) for c in connections]
})
def handle_connections(handler, user):
conn = get_db()
connections = conn.execute(
"SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({"connections": [dict(c) for c in connections]})
def handle_set_connection(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
service = data.get("service", "")
action = data.get("action", "connect") # connect or disconnect
if action == "disconnect":
delete_service_token(user["id"], service)
handler._send_json({"success": True})
return
auth_token = data.get("token", "")
auth_type = data.get("auth_type", "bearer")
if not service or not auth_token:
handler._send_json({"error": "service and token required"}, 400)
return
# Validate token against the service
target = _proxy_module.SERVICE_MAP.get(service)
if not target:
handler._send_json({"error": f"Unknown service: {service}"}, 400)
return
# Test the token
if service == "trips":
test_url = f"{target}/api/trips"
headers = {"Authorization": f"Bearer {auth_token}"}
elif service == "fitness":
test_url = f"{target}/api/users"
headers = {"Authorization": f"Bearer {auth_token}"}
else:
test_url = f"{target}/api/health"
headers = {"Authorization": f"Bearer {auth_token}"}
status, _, _ = proxy_request(test_url, "GET", headers, timeout=10)
if status == 401:
handler._send_json({"error": "Invalid token for this service"}, 401)
return
set_service_token(user["id"], service, auth_token, auth_type)
handler._send_json({"success": True})
def handle_pin(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
item_name = data.get("item_name", "")
service = data.get("service", "inventory")
if not item_id:
handler._send_json({"error": "item_id required"}, 400)
return
conn = get_db()
conn.execute("""
INSERT INTO pinned_items (user_id, service, item_id, item_name)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id, service, item_id) DO UPDATE SET item_name = ?, pinned_at = CURRENT_TIMESTAMP
""", (user["id"], service, item_id, item_name, item_name))
conn.commit()
conn.close()
handler._send_json({"success": True})
def handle_unpin(handler, user, body):
try:
data = json.loads(body)
except Exception as e:
handler._send_json({"error": "Invalid JSON"}, 400)
return
item_id = str(data.get("item_id", ""))
service = data.get("service", "inventory")
conn = get_db()
conn.execute("DELETE FROM pinned_items WHERE user_id = ? AND service = ? AND item_id = ?",
(user["id"], service, item_id))
conn.commit()
conn.close()
handler._send_json({"success": True})
def handle_get_pinned(handler, user):
conn = get_db()
rows = conn.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn.close()
handler._send_json({"pinned": [dict(r) for r in rows]})
def handle_dashboard(handler, user):
"""Aggregate dashboard data from connected services -- all fetches in parallel."""
from concurrent.futures import ThreadPoolExecutor, as_completed
conn = get_db()
apps = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall()
conn.close()
SERVICE_LEVEL_AUTH = {"inventory", "reader", "books", "music", "budget"}
widgets = []
futures = {}
def fetch_trips(app, headers):
target = app["proxy_target"]
widget_data = None
s1, _, b1 = proxy_request(f"{target}/api/trips", "GET", headers, timeout=10)
s2, _, b2 = proxy_request(f"{target}/api/stats", "GET", headers, timeout=10)
if s1 == 200:
trips = json.loads(b1).get("trips", [])
now = datetime.now().strftime("%Y-%m-%d")
active = [t for t in trips if t.get("start_date", "") <= now <= t.get("end_date", "")]
upcoming = sorted(
[t for t in trips if t.get("start_date", "") > now],
key=lambda t: t.get("start_date", "")
)[:3]
widget_data = {"active": active[:1], "upcoming": upcoming}
if s2 == 200:
widget_data["stats"] = json.loads(b2)
return widget_data
def fetch_fitness(app, headers):
target = app["proxy_target"]
today = datetime.now().strftime("%Y-%m-%d")
s1, _, b1 = proxy_request(f"{target}/api/users", "GET", headers, timeout=10)
if s1 != 200:
return None
users = json.loads(b1)
# Fetch all user data in parallel
def get_user_data(u):
s2, _, b2 = proxy_request(f"{target}/api/entries/totals?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
s3, _, b3 = proxy_request(f"{target}/api/goals/for-date?date={today}&user_id={u['id']}", "GET", headers, timeout=5)
return {
"user": u,
"totals": json.loads(b2) if s2 == 200 else None,
"goal": json.loads(b3) if s3 == 200 else None
}
with ThreadPoolExecutor(max_workers=4) as inner:
people = list(inner.map(get_user_data, users[:4]))
return {"people": people, "date": today}
def fetch_inventory(app):
target = app["proxy_target"]
hdrs = {"X-API-Key": INVENTORY_SERVICE_API_KEY} if INVENTORY_SERVICE_API_KEY else {}
s1, _, b1 = proxy_request(f"{target}/needs-review-count", "GET", hdrs, timeout=10)
return json.loads(b1) if s1 == 200 else None
def fetch_budget(app):
try:
hdrs = {"X-API-Key": BUDGET_SERVICE_API_KEY} if BUDGET_SERVICE_API_KEY else {}
s, _, b = proxy_request(f"{app['proxy_target']}/summary", "GET", hdrs, timeout=8)
if s == 200:
data = json.loads(b)
return {"count": data.get("transactionCount", 0), "totalBalance": data.get("totalBalanceDollars", 0), "spending": data.get("spendingDollars", 0), "income": data.get("incomeDollars", 0), "topCategories": data.get("topCategories", [])[:5], "month": data.get("month", "")}
except Exception as e:
print(f"[Dashboard] Budget fetch error: {e}")
return None
def fetch_reader(app):
target = app["proxy_target"]
hdrs = {"X-Auth-Token": MINIFLUX_API_KEY} if MINIFLUX_API_KEY else {}
s1, _, b1 = proxy_request(f"{target}/v1/feeds/counters", "GET", hdrs, timeout=10)
if s1 != 200:
return None
counters = json.loads(b1)
total_unread = sum(counters.get("unreads", {}).values())
entries = []
s2, _, b2 = proxy_request(
f"{target}/v1/entries?status=unread&direction=desc&order=published_at&limit=10",
"GET",
hdrs,
timeout=10
)
if s2 == 200:
payload = json.loads(b2)
for entry in payload.get("entries", [])[:10]:
feed = entry.get("feed") or {}
entries.append({
"id": entry.get("id"),
"title": entry.get("title") or "Untitled article",
"url": entry.get("url"),
"published_at": entry.get("published_at"),
"author": entry.get("author") or "",
"reading_time": entry.get("reading_time") or 0,
"feed": {
"id": feed.get("id"),
"title": feed.get("title") or "Feed"
}
})
return {"unread": total_unread, "articles": entries}
# Launch all widget fetches in parallel
with ThreadPoolExecutor(max_workers=3) as executor:
for app in apps:
app = dict(app)
svc_token = get_service_token(user["id"], app["id"])
is_service_level = app["id"] in SERVICE_LEVEL_AUTH
if not svc_token and not is_service_level:
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": False, "data": None})
continue
headers = {"Authorization": f"Bearer {svc_token['auth_token']}"} if svc_token else {}
if not headers and app["id"] == "trips" and TRIPS_API_TOKEN:
headers = {"Authorization": f"Bearer {TRIPS_API_TOKEN}"}
if app["dashboard_widget"] == "upcoming_trips":
futures[executor.submit(fetch_trips, app, headers)] = app
elif app["dashboard_widget"] == "daily_calories":
futures[executor.submit(fetch_fitness, app, headers)] = app
elif app["dashboard_widget"] == "items_issues":
futures[executor.submit(fetch_inventory, app)] = app
elif app["dashboard_widget"] == "unread_count":
futures[executor.submit(fetch_reader, app)] = app
elif app["dashboard_widget"] == "budget_summary":
futures[executor.submit(fetch_budget, app)] = app
for future in as_completed(futures):
app = futures[future]
try:
widget_data = future.result()
except Exception as e:
print(f"[Dashboard] Error fetching {app['id']}: {e}")
widget_data = None
widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": True, "data": widget_data})
# Sort by original app order
app_order = {dict(a)["id"]: dict(a)["sort_order"] for a in apps}
widgets.sort(key=lambda w: app_order.get(w["app"], 99))
# Include pinned items
conn2 = get_db()
pinned_rows = conn2.execute(
"SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC",
(user["id"],)
).fetchall()
conn2.close()
pinned = [dict(r) for r in pinned_rows]
handler._send_json({"widgets": widgets, "pinned": pinned})