""" Platform Gateway — Dashboard, apps, user profile, connections, pinning handlers. """ import json from datetime import datetime from config import TRIPS_API_TOKEN, MINIFLUX_API_KEY from database import get_db from sessions import get_service_token, set_service_token, delete_service_token import proxy as _proxy_module from proxy import proxy_request def handle_apps(handler, user): conn = get_db() rows = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall() # Check which services the user has connected connections = conn.execute("SELECT service FROM service_connections WHERE user_id = ?", (user["id"],)).fetchall() connected = {r["service"] for r in connections} conn.close() apps = [] for row in rows: app = dict(row) app["connected"] = app["id"] in connected del app["proxy_target"] # don't expose internal URLs apps.append(app) handler._send_json({"apps": apps}) def handle_me(handler, user): conn = get_db() connections = conn.execute( "SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?", (user["id"],) ).fetchall() conn.close() handler._send_json({ "user": { "id": user["id"], "username": user["username"], "display_name": user["display_name"], "created_at": user["created_at"] }, "connections": [dict(c) for c in connections] }) def handle_connections(handler, user): conn = get_db() connections = conn.execute( "SELECT service, auth_type, created_at FROM service_connections WHERE user_id = ?", (user["id"],) ).fetchall() conn.close() handler._send_json({"connections": [dict(c) for c in connections]}) def handle_set_connection(handler, user, body): try: data = json.loads(body) except Exception as e: handler._send_json({"error": "Invalid JSON"}, 400) return service = data.get("service", "") action = data.get("action", "connect") # connect or disconnect if action == "disconnect": delete_service_token(user["id"], service) handler._send_json({"success": True}) return auth_token = data.get("token", "") auth_type = data.get("auth_type", "bearer") if not service or not auth_token: handler._send_json({"error": "service and token required"}, 400) return # Validate token against the service target = _proxy_module.SERVICE_MAP.get(service) if not target: handler._send_json({"error": f"Unknown service: {service}"}, 400) return # Test the token if service == "trips": test_url = f"{target}/api/trips" headers = {"Authorization": f"Bearer {auth_token}"} elif service == "fitness": test_url = f"{target}/api/users" headers = {"Authorization": f"Bearer {auth_token}"} else: test_url = f"{target}/api/health" headers = {"Authorization": f"Bearer {auth_token}"} status, _, _ = proxy_request(test_url, "GET", headers, timeout=10) if status == 401: handler._send_json({"error": "Invalid token for this service"}, 401) return set_service_token(user["id"], service, auth_token, auth_type) handler._send_json({"success": True}) def handle_pin(handler, user, body): try: data = json.loads(body) except Exception as e: handler._send_json({"error": "Invalid JSON"}, 400) return item_id = str(data.get("item_id", "")) item_name = data.get("item_name", "") service = data.get("service", "inventory") if not item_id: handler._send_json({"error": "item_id required"}, 400) return conn = get_db() conn.execute(""" INSERT INTO pinned_items (user_id, service, item_id, item_name) VALUES (?, ?, ?, ?) ON CONFLICT(user_id, service, item_id) DO UPDATE SET item_name = ?, pinned_at = CURRENT_TIMESTAMP """, (user["id"], service, item_id, item_name, item_name)) conn.commit() conn.close() handler._send_json({"success": True}) def handle_unpin(handler, user, body): try: data = json.loads(body) except Exception as e: handler._send_json({"error": "Invalid JSON"}, 400) return item_id = str(data.get("item_id", "")) service = data.get("service", "inventory") conn = get_db() conn.execute("DELETE FROM pinned_items WHERE user_id = ? AND service = ? AND item_id = ?", (user["id"], service, item_id)) conn.commit() conn.close() handler._send_json({"success": True}) def handle_get_pinned(handler, user): conn = get_db() rows = conn.execute( "SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC", (user["id"],) ).fetchall() conn.close() handler._send_json({"pinned": [dict(r) for r in rows]}) def handle_dashboard(handler, user): """Aggregate dashboard data from connected services -- all fetches in parallel.""" from concurrent.futures import ThreadPoolExecutor, as_completed conn = get_db() apps = conn.execute("SELECT * FROM apps WHERE enabled = 1 ORDER BY sort_order").fetchall() conn.close() SERVICE_LEVEL_AUTH = {"inventory", "reader", "books", "music", "budget"} widgets = [] futures = {} def fetch_trips(app, headers): target = app["proxy_target"] widget_data = None s1, _, b1 = proxy_request(f"{target}/api/trips", "GET", headers, timeout=10) s2, _, b2 = proxy_request(f"{target}/api/stats", "GET", headers, timeout=10) if s1 == 200: trips = json.loads(b1).get("trips", []) now = datetime.now().strftime("%Y-%m-%d") active = [t for t in trips if t.get("start_date", "") <= now <= t.get("end_date", "")] upcoming = sorted( [t for t in trips if t.get("start_date", "") > now], key=lambda t: t.get("start_date", "") )[:3] widget_data = {"active": active[:1], "upcoming": upcoming} if s2 == 200: widget_data["stats"] = json.loads(b2) return widget_data def fetch_fitness(app, headers): target = app["proxy_target"] today = datetime.now().strftime("%Y-%m-%d") s1, _, b1 = proxy_request(f"{target}/api/users", "GET", headers, timeout=10) if s1 != 200: return None users = json.loads(b1) # Fetch all user data in parallel def get_user_data(u): s2, _, b2 = proxy_request(f"{target}/api/entries/totals?date={today}&user_id={u['id']}", "GET", headers, timeout=5) s3, _, b3 = proxy_request(f"{target}/api/goals/for-date?date={today}&user_id={u['id']}", "GET", headers, timeout=5) return { "user": u, "totals": json.loads(b2) if s2 == 200 else None, "goal": json.loads(b3) if s3 == 200 else None } with ThreadPoolExecutor(max_workers=4) as inner: people = list(inner.map(get_user_data, users[:4])) return {"people": people, "date": today} def fetch_inventory(app): target = app["proxy_target"] s1, _, b1 = proxy_request(f"{target}/needs-review-count", "GET", {}, timeout=10) return json.loads(b1) if s1 == 200 else None def fetch_budget(app): try: s, _, b = proxy_request(f"{app['proxy_target']}/summary", "GET", {}, timeout=8) if s == 200: data = json.loads(b) return {"count": data.get("transactionCount", 0), "totalBalance": data.get("totalBalanceDollars", 0), "spending": data.get("spendingDollars", 0), "income": data.get("incomeDollars", 0), "topCategories": data.get("topCategories", [])[:5], "month": data.get("month", "")} except Exception as e: pass return None def fetch_reader(app): target = app["proxy_target"] hdrs = {"X-Auth-Token": MINIFLUX_API_KEY} if MINIFLUX_API_KEY else {} s1, _, b1 = proxy_request(f"{target}/v1/feeds/counters", "GET", hdrs, timeout=10) if s1 != 200: return None counters = json.loads(b1) total_unread = sum(counters.get("unreads", {}).values()) entries = [] s2, _, b2 = proxy_request( f"{target}/v1/entries?status=unread&direction=desc&order=published_at&limit=10", "GET", hdrs, timeout=10 ) if s2 == 200: payload = json.loads(b2) for entry in payload.get("entries", [])[:10]: feed = entry.get("feed") or {} entries.append({ "id": entry.get("id"), "title": entry.get("title") or "Untitled article", "url": entry.get("url"), "published_at": entry.get("published_at"), "author": entry.get("author") or "", "reading_time": entry.get("reading_time") or 0, "feed": { "id": feed.get("id"), "title": feed.get("title") or "Feed" } }) return {"unread": total_unread, "articles": entries} # Launch all widget fetches in parallel with ThreadPoolExecutor(max_workers=3) as executor: for app in apps: app = dict(app) svc_token = get_service_token(user["id"], app["id"]) is_service_level = app["id"] in SERVICE_LEVEL_AUTH if not svc_token and not is_service_level: widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": False, "data": None}) continue headers = {"Authorization": f"Bearer {svc_token['auth_token']}"} if svc_token else {} if not headers and app["id"] == "trips" and TRIPS_API_TOKEN: headers = {"Authorization": f"Bearer {TRIPS_API_TOKEN}"} if app["dashboard_widget"] == "upcoming_trips": futures[executor.submit(fetch_trips, app, headers)] = app elif app["dashboard_widget"] == "daily_calories": futures[executor.submit(fetch_fitness, app, headers)] = app elif app["dashboard_widget"] == "items_issues": futures[executor.submit(fetch_inventory, app)] = app elif app["dashboard_widget"] == "unread_count": futures[executor.submit(fetch_reader, app)] = app elif app["dashboard_widget"] == "budget_summary": futures[executor.submit(fetch_budget, app)] = app for future in as_completed(futures): app = futures[future] try: widget_data = future.result() except Exception as e: print(f"[Dashboard] Error fetching {app['id']}: {e}") widget_data = None widgets.append({"app": app["id"], "name": app["name"], "widget": app["dashboard_widget"], "connected": True, "data": widget_data}) # Sort by original app order app_order = {dict(a)["id"]: dict(a)["sort_order"] for a in apps} widgets.sort(key=lambda w: app_order.get(w["app"], 99)) # Include pinned items conn2 = get_db() pinned_rows = conn2.execute( "SELECT * FROM pinned_items WHERE user_id = ? ORDER BY pinned_at DESC", (user["id"],) ).fetchall() conn2.close() pinned = [dict(r) for r in pinned_rows] handler._send_json({"widgets": widgets, "pinned": pinned})