#!/usr/bin/env python3 """ Calorie Tracker - Self-hosted calorie & macro tracker with SQLite backend Replaces SparkyFitness with fuzzy food matching, confidence-based resolve, and AI intake support. """ import os import json import sqlite3 import uuid import hashlib import secrets import re import unicodedata from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie from datetime import datetime, date, timedelta from pathlib import Path from urllib.parse import urlparse, parse_qs from difflib import SequenceMatcher import urllib.request import urllib.error import logging logger = logging.getLogger(__name__) # Configuration PORT = int(os.environ.get("PORT", 8095)) DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data")) DB_PATH = DATA_DIR / "calories.db" IMAGES_DIR = DATA_DIR / "images" API_KEY = os.environ.get("CALORIES_API_KEY", "") # For service-to-service (Telegram) USDA_API_KEY = os.environ.get("USDA_API_KEY", "") # Free from https://fdc.nal.usda.gov/api-key-signup.html OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") # For AI nutrition estimation OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") # Fast + cheap for estimation GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") # For food image search GOOGLE_CX = os.environ.get("GOOGLE_CX", "") # Google Custom Search engine ID # Ensure directories exist DATA_DIR.mkdir(parents=True, exist_ok=True) IMAGES_DIR.mkdir(parents=True, exist_ok=True) # ─── Database ──────────────────────────────────────────────────────────────── def get_db(): """Get database connection.""" conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA journal_mode = WAL") return conn def normalize_food_name(name: str) -> str: """Normalize a food name for matching: lowercase, strip, collapse whitespace, remove accents.""" if not name: return "" # Remove accents name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode('ascii') # Lowercase, strip, collapse whitespace name = re.sub(r'\s+', ' ', name.lower().strip()) # Remove common filler words for matching name = re.sub(r'\b(the|a|an|of|with|and|in)\b', '', name).strip() name = re.sub(r'\s+', ' ', name) return name def _naive_singularize(name: str) -> str: """Strip common plural suffixes for search retry. Conservative — only trailing 's'.""" lower = name.lower().strip() if lower.endswith('ies'): return name[:-3] + 'y' # berries -> berry if lower.endswith('s') and not lower.endswith('ss') and len(lower) > 3: return name[:-1] return name def tokenize_food_name(name: str) -> set: """Break a normalized food name into tokens for overlap matching.""" return set(normalize_food_name(name).split()) def similarity_score(a: str, b: str) -> float: """Compute similarity between two food names (0.0 to 1.0).""" norm_a = normalize_food_name(a) norm_b = normalize_food_name(b) # Exact normalized match if norm_a == norm_b: return 1.0 # Token overlap (Jaccard) tokens_a = tokenize_food_name(a) tokens_b = tokenize_food_name(b) if tokens_a and tokens_b: jaccard = len(tokens_a & tokens_b) / len(tokens_a | tokens_b) else: jaccard = 0.0 # Sequence similarity seq = SequenceMatcher(None, norm_a, norm_b).ratio() # Weighted combination return max(jaccard * 0.6 + seq * 0.4, seq) def init_db(): """Initialize database schema.""" conn = get_db() cursor = conn.cursor() # ── Users ── cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, display_name TEXT NOT NULL, telegram_user_id TEXT UNIQUE, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # ── Sessions ── cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, expires_at TEXT NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ''') # ── Foods (master records, nutrition per_100g as base) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS foods ( id TEXT PRIMARY KEY, name TEXT NOT NULL, normalized_name TEXT NOT NULL, brand TEXT, brand_normalized TEXT, barcode TEXT, notes TEXT, -- Nutrition per base unit (per_100g for weight-based, per serving for countable items) calories_per_base REAL NOT NULL DEFAULT 0, protein_per_base REAL NOT NULL DEFAULT 0, carbs_per_base REAL NOT NULL DEFAULT 0, fat_per_base REAL NOT NULL DEFAULT 0, -- Base unit: "100g" for weight-based foods, or "piece"/"slice"/"serving" etc for countable base_unit TEXT NOT NULL DEFAULT '100g', -- Status: confirmed, ai_created, needs_review, archived status TEXT NOT NULL DEFAULT 'confirmed', created_by_user_id TEXT, is_shared INTEGER NOT NULL DEFAULT 1, image_path TEXT, -- Filename in data/images/ created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (created_by_user_id) REFERENCES users(id) ) ''') # ── Food servings (named portion definitions for a food) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS food_servings ( id TEXT PRIMARY KEY, food_id TEXT NOT NULL, name TEXT NOT NULL, -- e.g. "1 cup", "1 slice", "small bowl", "medium plate" -- How much of the base unit this serving represents -- For base_unit=100g: amount_in_base=1.5 means 150g -- For base_unit=piece: amount_in_base=1 means 1 piece amount_in_base REAL NOT NULL DEFAULT 1.0, is_default INTEGER NOT NULL DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE ) ''') # ── Food aliases (for fuzzy matching) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS food_aliases ( id TEXT PRIMARY KEY, food_id TEXT NOT NULL, alias TEXT NOT NULL, alias_normalized TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE, UNIQUE(alias_normalized) ) ''') # ── Food entries (daily log with immutable snapshots) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS food_entries ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, food_id TEXT, -- Reference to food (nullable for quick-add) -- Meal type: breakfast, lunch, dinner, snack meal_type TEXT NOT NULL, entry_date TEXT NOT NULL, -- YYYY-MM-DD -- Entry type: food (normal), quick_add (calories only) entry_type TEXT NOT NULL DEFAULT 'food', -- Quantity and unit at log time quantity REAL NOT NULL DEFAULT 1.0, unit TEXT NOT NULL DEFAULT 'serving', serving_description TEXT, -- e.g. "1 medium bowl", "2 slices" -- IMMUTABLE SNAPSHOT: full nutrition + context at time of logging snapshot_food_name TEXT NOT NULL, snapshot_serving_label TEXT, -- e.g. "1 breast (170g)", "1 cup" snapshot_grams REAL, -- estimated grams if known snapshot_calories REAL NOT NULL DEFAULT 0, snapshot_protein REAL NOT NULL DEFAULT 0, snapshot_carbs REAL NOT NULL DEFAULT 0, snapshot_fat REAL NOT NULL DEFAULT 0, -- Source & method source TEXT NOT NULL DEFAULT 'web', -- where: web, telegram, api entry_method TEXT NOT NULL DEFAULT 'manual', -- how: manual, search, template, ai_plate, ai_label, quick_add raw_text TEXT, -- Original text from Telegram/AI confidence_score REAL, -- 0.0 to 1.0 from resolve note TEXT, -- User note image_ref TEXT, -- Reference to uploaded image (AI photo logging) -- AI metadata (JSON blob if from AI) ai_metadata TEXT, -- Idempotency: prevent duplicate entries from retries idempotency_key TEXT UNIQUE, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (food_id) REFERENCES foods(id) ) ''') # ── Goals (date-ranged) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS goals ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, start_date TEXT NOT NULL, -- YYYY-MM-DD end_date TEXT, -- NULL = still active calories REAL NOT NULL DEFAULT 2000, protein REAL NOT NULL DEFAULT 150, carbs REAL NOT NULL DEFAULT 200, fat REAL NOT NULL DEFAULT 65, is_active INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) ''') # ── Meal templates ── cursor.execute(''' CREATE TABLE IF NOT EXISTS meal_templates ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, meal_type TEXT, -- Optional default meal type is_favorite INTEGER NOT NULL DEFAULT 0, is_archived INTEGER NOT NULL DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS meal_template_items ( id TEXT PRIMARY KEY, template_id TEXT NOT NULL, food_id TEXT NOT NULL, quantity REAL NOT NULL DEFAULT 1.0, unit TEXT NOT NULL DEFAULT 'serving', serving_description TEXT, -- Snapshot for display (so template shows correct info even if food changes) snapshot_food_name TEXT NOT NULL, snapshot_calories REAL NOT NULL DEFAULT 0, snapshot_protein REAL NOT NULL DEFAULT 0, snapshot_carbs REAL NOT NULL DEFAULT 0, snapshot_fat REAL NOT NULL DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (template_id) REFERENCES meal_templates(id) ON DELETE CASCADE, FOREIGN KEY (food_id) REFERENCES foods(id) ) ''') # ── Food resolution queue (for low-confidence matches) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS food_resolution_queue ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, raw_text TEXT NOT NULL, proposed_food_id TEXT, candidates_json TEXT, -- JSON array of {food_id, name, score} confidence REAL NOT NULL DEFAULT 0, meal_type TEXT, entry_date TEXT, quantity REAL, unit TEXT, source TEXT, -- Resolution resolved_food_id TEXT, resolved_at TEXT, resolution_action TEXT, -- matched, created, merged, dismissed created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (proposed_food_id) REFERENCES foods(id), FOREIGN KEY (resolved_food_id) REFERENCES foods(id) ) ''') # ── User favorites ── cursor.execute(''' CREATE TABLE IF NOT EXISTS user_favorites ( user_id TEXT NOT NULL, food_id TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (user_id, food_id), FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE ) ''') # ── Audit log (merges, resolutions, AI edits) ── cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id TEXT PRIMARY KEY, user_id TEXT, action TEXT NOT NULL, -- food_merged, queue_resolved, entry_ai_edited, food_archived, food_created entity_type TEXT NOT NULL, -- food, entry, queue, template entity_id TEXT, details TEXT, -- JSON blob with action-specific details created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) ''') # ── FTS5 virtual table for food search ── cursor.execute(''' CREATE VIRTUAL TABLE IF NOT EXISTS foods_fts USING fts5( name, normalized_name, brand, content=foods, content_rowid=rowid ) ''') # ── FTS5 for aliases ── cursor.execute(''' CREATE VIRTUAL TABLE IF NOT EXISTS aliases_fts USING fts5( alias, alias_normalized, content=food_aliases, content_rowid=rowid ) ''') # ── Indexes ── cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_normalized ON foods(normalized_name)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_status ON foods(status)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_brand_norm ON foods(brand_normalized)') cursor.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_foods_barcode_unique ON foods(barcode) WHERE barcode IS NOT NULL') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_aliases_normalized ON food_aliases(alias_normalized)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_user_date ON food_entries(user_id, entry_date)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_user_meal ON food_entries(user_id, meal_type)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_food ON food_entries(food_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_food_date ON food_entries(food_id, entry_date)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_idempotency ON food_entries(idempotency_key) WHERE idempotency_key IS NOT NULL') cursor.execute('CREATE INDEX IF NOT EXISTS idx_goals_user_date ON goals(user_id, start_date)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_goals_user_active ON goals(user_id, is_active)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_resolution_queue_user ON food_resolution_queue(user_id, resolved_at)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_telegram ON users(telegram_user_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_log(entity_type, entity_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_templates_user ON meal_templates(user_id, is_archived)') # ── Triggers to keep FTS5 in sync ── cursor.executescript(''' CREATE TRIGGER IF NOT EXISTS foods_ai AFTER INSERT ON foods BEGIN INSERT INTO foods_fts(rowid, name, normalized_name, brand) VALUES (new.rowid, new.name, new.normalized_name, new.brand); END; CREATE TRIGGER IF NOT EXISTS foods_ad AFTER DELETE ON foods BEGIN INSERT INTO foods_fts(foods_fts, rowid, name, normalized_name, brand) VALUES ('delete', old.rowid, old.name, old.normalized_name, old.brand); END; CREATE TRIGGER IF NOT EXISTS foods_au AFTER UPDATE ON foods BEGIN INSERT INTO foods_fts(foods_fts, rowid, name, normalized_name, brand) VALUES ('delete', old.rowid, old.name, old.normalized_name, old.brand); INSERT INTO foods_fts(rowid, name, normalized_name, brand) VALUES (new.rowid, new.name, new.normalized_name, new.brand); END; CREATE TRIGGER IF NOT EXISTS aliases_ai AFTER INSERT ON food_aliases BEGIN INSERT INTO aliases_fts(rowid, alias, alias_normalized) VALUES (new.rowid, new.alias, new.alias_normalized); END; CREATE TRIGGER IF NOT EXISTS aliases_ad AFTER DELETE ON food_aliases BEGIN INSERT INTO aliases_fts(aliases_fts, rowid, alias, alias_normalized) VALUES ('delete', old.rowid, old.alias, old.alias_normalized); END; CREATE TRIGGER IF NOT EXISTS aliases_au AFTER UPDATE ON food_aliases BEGIN INSERT INTO aliases_fts(aliases_fts, rowid, alias, alias_normalized) VALUES ('delete', old.rowid, old.alias, old.alias_normalized); INSERT INTO aliases_fts(rowid, alias, alias_normalized) VALUES (new.rowid, new.alias, new.alias_normalized); END; ''') # Migration: add image_path to foods if missing try: cursor.execute("ALTER TABLE foods ADD COLUMN image_path TEXT") conn.commit() except: pass conn.commit() conn.close() # ─── Food image search & download ─────────────────────────────────────────── def search_google_images(query: str, num: int = 6) -> list: """Search Google Images for food photos.""" if not GOOGLE_API_KEY or not GOOGLE_CX: return [] url = (f"https://www.googleapis.com/customsearch/v1" f"?key={GOOGLE_API_KEY}&cx={GOOGLE_CX}&searchType=image" f"&q={urllib.parse.quote(query + ' food')}&num={num}") try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read().decode()) return [ {'url': item.get('link'), 'thumbnail': item.get('image', {}).get('thumbnailLink'), 'title': item.get('title')} for item in data.get('items', []) ] except Exception as e: logger.warning(f"Google image search failed: {e}") return [] def download_food_image(image_url: str, food_id: str) -> str | None: """Download an image from URL and save to data/images/. Returns filename.""" try: req = urllib.request.Request(image_url, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "image/*,*/*", "Referer": "https://www.google.com/", }) with urllib.request.urlopen(req, timeout=15) as resp: image_bytes = resp.read() content_type = resp.headers.get("Content-Type", "image/jpeg") ext_map = {"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "image/webp": ".webp"} ext = ext_map.get(content_type.split(";")[0], ".jpg") # Unique filename each time so browser cache doesn't serve stale image filename = f"{food_id}_{uuid.uuid4().hex[:8]}{ext}" filepath = IMAGES_DIR / filename # Delete old image file if exists conn = get_db() old = conn.execute("SELECT image_path FROM foods WHERE id = ?", (food_id,)).fetchone() if old and old['image_path']: old_path = IMAGES_DIR / old['image_path'] if old_path.exists(): old_path.unlink() with open(filepath, "wb") as f: f.write(image_bytes) # Update food record conn.execute("UPDATE foods SET image_path = ? WHERE id = ?", (filename, food_id)) conn.commit() conn.close() return filename except Exception as e: logger.warning(f"Failed to download food image: {e}") return None def auto_fetch_food_image(food_id: str, food_name: str, brand: str = None): """Auto-fetch an image for a food using Google Image Search. Runs in background.""" import threading def _fetch(): query = f"{brand} {food_name}" if brand else food_name images = search_google_images(query, num=6) # Try full-size URLs first (sharp images), thumbnail as last resort for img in images: url = img.get('url') if url: result = download_food_image(url, food_id) if result: return # All full URLs failed — fall back to thumbnails for img in images: thumb = img.get('thumbnail') if thumb: result = download_food_image(thumb, food_id) if result: return threading.Thread(target=_fetch, daemon=True).start() def seed_default_users(): """Create default users if they don't exist.""" conn = get_db() cursor = conn.cursor() users = [ { "id": str(uuid.uuid4()), "username": os.environ.get("USER1_USERNAME", "yusuf"), "password": os.environ.get("USER1_PASSWORD", "changeme"), "display_name": os.environ.get("USER1_DISPLAY_NAME", "Yusuf"), "telegram_user_id": os.environ.get("USER1_TELEGRAM_ID", "5878604567"), }, { "id": str(uuid.uuid4()), "username": os.environ.get("USER2_USERNAME", "madiha"), "password": os.environ.get("USER2_PASSWORD", "changeme"), "display_name": os.environ.get("USER2_DISPLAY_NAME", "Madiha"), "telegram_user_id": os.environ.get("USER2_TELEGRAM_ID", "6389024883"), }, ] for user in users: existing = cursor.execute("SELECT id FROM users WHERE username = ?", (user["username"],)).fetchone() if not existing: password_hash = hashlib.sha256(user["password"].encode()).hexdigest() cursor.execute( "INSERT INTO users (id, username, password_hash, display_name, telegram_user_id) VALUES (?, ?, ?, ?, ?)", (user["id"], user["username"], password_hash, user["display_name"], user["telegram_user_id"]) ) conn.commit() conn.close() # ─── Auth helpers ──────────────────────────────────────────────────────────── def create_session(user_id: str) -> str: """Create a new session token for a user.""" token = secrets.token_urlsafe(32) expires = (datetime.now() + timedelta(days=30)).isoformat() conn = get_db() conn.execute("INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)", (token, user_id, expires)) conn.commit() conn.close() return token def get_user_from_session(token: str) -> dict | None: """Get user from session token.""" if not token: return None conn = get_db() row = conn.execute(''' SELECT u.id, u.username, u.display_name, u.telegram_user_id FROM sessions s JOIN users u ON s.user_id = u.id WHERE s.token = ? AND s.expires_at > ? ''', (token, datetime.now().isoformat())).fetchone() conn.close() if row: return dict(row) return None def get_user_from_api_key_and_telegram(api_key: str, telegram_user_id: str) -> dict | None: """Authenticate via API key + Telegram user ID (for service-to-service).""" if not API_KEY or api_key != API_KEY: return None conn = get_db() row = conn.execute( "SELECT id, username, display_name, telegram_user_id FROM users WHERE telegram_user_id = ?", (telegram_user_id,) ).fetchone() conn.close() if row: return dict(row) return None # ─── Food search & resolve engine ──────────────────────────────────────────── def search_foods(query: str, user_id: str = None, limit: int = 20) -> list: """ Layered food search: 1. Exact normalized match 2. Alias exact match 3. Tokenized/contains match 4. FTS5 candidate retrieval 5. Similarity scoring over top candidates """ conn = get_db() norm_query = normalize_food_name(query) query_tokens = tokenize_food_name(query) candidates = {} # food_id -> {food_data, score, match_type} # Layer 1: Exact normalized match rows = conn.execute( "SELECT * FROM foods WHERE normalized_name = ? AND status != 'archived'", (norm_query,) ).fetchall() for row in rows: food = dict(row) candidates[food['id']] = {'food': food, 'score': 1.0, 'match_type': 'exact'} # Layer 2: Alias exact match alias_rows = conn.execute( "SELECT fa.food_id, f.* FROM food_aliases fa JOIN foods f ON fa.food_id = f.id " "WHERE fa.alias_normalized = ? AND f.status != 'archived'", (norm_query,) ).fetchall() for row in alias_rows: food = dict(row) fid = food['food_id'] if fid not in candidates: candidates[fid] = {'food': food, 'score': 0.95, 'match_type': 'alias_exact'} # Layer 3: Tokenized/contains match (all query tokens appear in food name) if query_tokens and len(candidates) < limit: like_clauses = " AND ".join(["normalized_name LIKE ?" for _ in query_tokens]) like_params = [f"%{t}%" for t in query_tokens] rows = conn.execute( f"SELECT * FROM foods WHERE {like_clauses} AND status != 'archived' LIMIT ?", like_params + [limit] ).fetchall() for row in rows: food = dict(row) if food['id'] not in candidates: score = similarity_score(query, food['name']) candidates[food['id']] = {'food': food, 'score': max(score, 0.7), 'match_type': 'token_match'} # Layer 4: FTS5 search if len(candidates) < limit: # Build FTS query: each token as a prefix match fts_terms = " OR ".join([f'"{t}"*' for t in query_tokens if t]) if fts_terms: try: fts_rows = conn.execute( "SELECT f.* FROM foods_fts fts JOIN foods f ON f.rowid = fts.rowid " "WHERE foods_fts MATCH ? AND f.status != 'archived' LIMIT ?", (fts_terms, limit * 2) ).fetchall() for row in fts_rows: food = dict(row) if food['id'] not in candidates: score = similarity_score(query, food['name']) candidates[food['id']] = {'food': food, 'score': score, 'match_type': 'fts'} except sqlite3.OperationalError: pass # FTS query syntax error, skip # Also search aliases FTS try: alias_fts_rows = conn.execute( "SELECT fa.food_id, f.* FROM aliases_fts afts " "JOIN food_aliases fa ON fa.rowid = afts.rowid " "JOIN foods f ON fa.food_id = f.id " "WHERE aliases_fts MATCH ? AND f.status != 'archived' LIMIT ?", (fts_terms, limit) ).fetchall() for row in alias_fts_rows: food = dict(row) fid = food.get('food_id', food['id']) if fid not in candidates: score = similarity_score(query, food['name']) candidates[fid] = {'food': food, 'score': max(score, 0.5), 'match_type': 'alias_fts'} except sqlite3.OperationalError: pass conn.close() # Sort by score descending and return top N sorted_candidates = sorted(candidates.values(), key=lambda x: x['score'], reverse=True)[:limit] results = [] for c in sorted_candidates: food = c['food'] # Get servings for this food servings = get_food_servings(food['id']) results.append({ 'id': food['id'], 'name': food['name'], 'brand': food.get('brand'), 'base_unit': food.get('base_unit', '100g'), 'calories_per_base': food.get('calories_per_base', 0), 'protein_per_base': food.get('protein_per_base', 0), 'carbs_per_base': food.get('carbs_per_base', 0), 'fat_per_base': food.get('fat_per_base', 0), 'status': food.get('status', 'confirmed'), 'image_path': food.get('image_path'), 'servings': servings, 'score': round(c['score'], 3), 'match_type': c['match_type'], }) return results # ─── External nutrition lookup (OpenFoodFacts + USDA) ──────────────────────── def _http_get_json(url: str, timeout: int = 15) -> dict | None: """Make a GET request and return parsed JSON, or None on failure.""" try: req = urllib.request.Request(url, headers={'User-Agent': 'CalorieTracker/1.0'}) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode('utf-8')) except (urllib.error.URLError, json.JSONDecodeError, TimeoutError, OSError) as e: logger.debug(f"External lookup failed for {url}: {e}") return None def search_openfoodfacts(query: str, limit: int = 5) -> list: """Search OpenFoodFacts for food products using v2 API. Returns list of {name, brand, barcode, calories_per_100g, protein, carbs, fat, serving_size, serving_unit, source} """ encoded = urllib.parse.quote(query) # v2 search API — more reliable than the old cgi endpoint # Try category search first (best for branded products) url = (f"https://world.openfoodfacts.org/api/v2/search?" f"categories_tags_en={encoded}&" f"fields=product_name,brands,code,nutriments,serving_size,serving_quantity&" f"page_size={limit}&sort_by=unique_scans_n") data = _http_get_json(url) # Fallback: v2 text search (search_terms param) if not data or not data.get('products'): url = (f"https://world.openfoodfacts.org/api/v2/search?" f"search_terms={encoded}&" f"fields=product_name,brands,code,nutriments,serving_size,serving_quantity&" f"page_size={limit}&sort_by=unique_scans_n") data = _http_get_json(url) if not data or 'products' not in data: return [] results = [] for p in data['products'][:limit]: nuts = p.get('nutriments', {}) cal = nuts.get('energy-kcal_100g') or 0 # If no kcal, try kJ and convert if not cal: kj = nuts.get('energy_100g') or 0 if kj: cal = round(float(kj) / 4.184, 1) name = p.get('product_name', '').strip() if not name or not cal: continue results.append({ 'name': name, 'brand': (p.get('brands') or '').split(',')[0].strip() or None, 'barcode': p.get('code'), 'calories_per_100g': round(float(cal), 1), 'protein_per_100g': round(float(nuts.get('proteins_100g', 0) or 0), 1), 'carbs_per_100g': round(float(nuts.get('carbohydrates_100g', 0) or 0), 1), 'fat_per_100g': round(float(nuts.get('fat_100g', 0) or 0), 1), 'serving_size_text': p.get('serving_size'), 'serving_grams': p.get('serving_quantity'), 'source': 'openfoodfacts', }) return results def lookup_openfoodfacts_barcode(barcode: str) -> dict | None: """Look up a specific product by barcode on OpenFoodFacts.""" url = f"https://world.openfoodfacts.org/api/v2/product/{barcode}.json?fields=product_name,brands,code,nutriments,serving_size,serving_quantity" data = _http_get_json(url) if not data or data.get('status') != 1 or 'product' not in data: return None p = data['product'] nuts = p.get('nutriments', {}) cal = nuts.get('energy-kcal_100g', 0) or nuts.get('energy_100g', 0) if cal and not nuts.get('energy-kcal_100g'): cal = round(cal / 4.184, 1) name = p.get('product_name', '').strip() if not name: return None return { 'name': name, 'brand': (p.get('brands') or '').split(',')[0].strip() or None, 'barcode': p.get('code'), 'calories_per_100g': round(float(cal), 1), 'protein_per_100g': round(float(nuts.get('proteins_100g', 0) or 0), 1), 'carbs_per_100g': round(float(nuts.get('carbohydrates_100g', 0) or 0), 1), 'fat_per_100g': round(float(nuts.get('fat_100g', 0) or 0), 1), 'serving_size_text': p.get('serving_size'), 'serving_grams': p.get('serving_quantity'), 'source': 'openfoodfacts', } def search_usda(query: str, limit: int = 5) -> list: """Search USDA FoodData Central for foods. Uses USDA_API_KEY env var, or falls back to DEMO_KEY (rate-limited). Free key: https://fdc.nal.usda.gov/api-key-signup.html """ api_key = USDA_API_KEY or 'DEMO_KEY' encoded = urllib.parse.quote(query) url = f"https://api.nal.usda.gov/fdc/v1/foods/search?api_key={api_key}&query={encoded}&pageSize={limit}&dataType=Foundation,SR%20Legacy" data = _http_get_json(url) if not data or 'foods' not in data: return [] results = [] for food in data['foods'][:limit]: # Build nutrient map with IDs for disambiguation nutrient_by_id = {} nutrient_by_name = {} for n in food.get('foodNutrients', []): nutrient_by_id[n.get('nutrientId')] = n.get('value', 0) nutrient_by_name[n.get('nutrientName', '')] = n.get('value', 0) # Energy: prefer kcal (nutrientId=1008) over kJ (nutrientId=1062) cal = nutrient_by_id.get(1008, 0) # Energy (kcal) if not cal: kj = nutrient_by_id.get(1062, 0) # Energy (kJ) if kj: cal = round(kj / 4.184, 1) if not cal: # Fallback: calculate from macros (4cal/g protein, 4cal/g carbs, 9cal/g fat) protein = nutrient_by_name.get('Protein', 0) or 0 carbs = nutrient_by_name.get('Carbohydrate, by difference', 0) or 0 fat = nutrient_by_name.get('Total lipid (fat)', 0) or 0 cal = round(protein * 4 + carbs * 4 + fat * 9, 1) protein = nutrient_by_name.get('Protein', 0) or 0 carbs = nutrient_by_name.get('Carbohydrate, by difference', 0) or 0 fat = nutrient_by_name.get('Total lipid (fat)', 0) or 0 name = food.get('description', '').strip() if not name or (not cal and not protein): continue # USDA names are uppercase, clean them up name = name.title() results.append({ 'name': name, 'brand': food.get('brandName'), 'barcode': food.get('gtinUpc'), 'calories_per_100g': round(float(cal), 1), 'protein_per_100g': round(float(protein), 1), 'carbs_per_100g': round(float(carbs), 1), 'fat_per_100g': round(float(fat), 1), 'serving_size_text': None, 'serving_grams': None, 'source': 'usda', 'usda_fdc_id': food.get('fdcId'), }) return results def search_external(query: str, limit: int = 5) -> list: """Search all external nutrition databases, deduplicate, and rank results.""" all_results = [] # Query both sources off_results = search_openfoodfacts(query, limit) usda_results = search_usda(query, limit) all_results.extend(off_results) all_results.extend(usda_results) if not all_results: return [] # Score by name similarity to query and deduplicate seen_names = set() scored = [] for r in all_results: norm = normalize_food_name(r['name']) if norm in seen_names: continue seen_names.add(norm) score = similarity_score(query, r['name']) r['relevance_score'] = round(score, 3) scored.append(r) # Sort by relevance scored.sort(key=lambda x: x['relevance_score'], reverse=True) return scored[:limit] def import_external_food(external_result: dict, user_id: str) -> dict: """Import a food from an external nutrition database into the local DB.""" serving_grams = external_result.get('serving_grams') serving_text = external_result.get('serving_size_text') servings = [{'name': '100g', 'amount_in_base': 1.0, 'is_default': not serving_grams}] if serving_grams and serving_text: servings.append({ 'name': serving_text, 'amount_in_base': round(float(serving_grams) / 100, 2), 'is_default': True, }) food = create_food({ 'name': external_result['name'], 'brand': external_result.get('brand'), 'barcode': external_result.get('barcode'), 'calories_per_base': external_result['calories_per_100g'], 'protein_per_base': external_result['protein_per_100g'], 'carbs_per_base': external_result['carbs_per_100g'], 'fat_per_base': external_result['fat_per_100g'], 'base_unit': '100g', 'status': 'confirmed', 'notes': f"Imported from {external_result.get('source', 'external')}", 'servings': servings, }, user_id) return food # ─── Natural language parsing ──────────────────────────────────────────────── def parse_food_request(raw_text: str) -> dict: """Parse a natural language food logging request. Extracts: food_description, meal_type, quantity, unit, modifiers, exclusions Examples: 'Log a single scoop hot fudge sundae from braums' -> food='hot fudge sundae', brand='braums', quantity=1, unit='scoop', meal_type=None 'Log 2 mince tacos for lunch. No sour cream. Just mince, cheese and lettuce' -> food='mince tacos', quantity=2, meal_type='lunch', modifiers='just mince, cheese and lettuce', exclusions='no sour cream' 'Add as a snack a scoop of vanilla ice cream. Not the strawberry shake' -> food='vanilla ice cream', quantity=1, unit='scoop', meal_type='snack', exclusions='not the strawberry shake' """ text = raw_text.strip() result = { 'food_description': text, 'meal_type': None, 'quantity': 1.0, 'unit': 'serving', 'brand': None, 'modifiers': None, 'exclusions': None, 'original_text': text, } # Strip command prefixes: "log", "add", "track", "record" text = re.sub(r'^(?:log|add|track|record)\s+', '', text, flags=re.IGNORECASE).strip() # Extract meal type from text meal_patterns = [ r'\bfor\s+(breakfast|lunch|dinner|snack)\b', r'\bas\s+(?:a\s+)?(breakfast|lunch|dinner|snack)\b', r'\b(breakfast|lunch|dinner|snack)\s*[:\-]\s*', r'\bto\s+(breakfast|lunch|dinner|snack)\b', ] for pattern in meal_patterns: m = re.search(pattern, text, re.IGNORECASE) if m: result['meal_type'] = m.group(1).lower() text = text[:m.start()] + text[m.end():] text = text.strip().rstrip('.') break # Split into sentences for better parsing sentences = re.split(r'[.!]\s+', text) main_sentence = sentences[0] if sentences else text extra_sentences = sentences[1:] if len(sentences) > 1 else [] # Extract exclusions from extra sentences and main: "no sour cream", "not the strawberry", "make sure..." exclusion_parts = [] modifier_parts = [] remaining_extras = [] for sent in extra_sentences: sent = sent.strip().rstrip('.') if re.match(r'^(?:no|not|without|don\'?t|never)\b', sent, re.IGNORECASE): exclusion_parts.append(sent) elif re.match(r'^(?:just|only|make\s+sure|with|extra|light|heavy)\b', sent, re.IGNORECASE): # "Make sure it is only vanilla ice cream" -> modifier modifier_parts.append(sent) elif re.match(r'^(\d+)\s+(?:of\s+them|small|medium|large)', sent, re.IGNORECASE): # "3 of them", "3 small ones" -> quantity override qty_m = re.match(r'^(\d+)', sent) if qty_m: result['quantity'] = float(qty_m.group(1)) # Check for size modifier size_m = re.search(r'\b(small|medium|large)\b', sent, re.IGNORECASE) if size_m: modifier_parts.append(size_m.group(0).lower()) else: remaining_extras.append(sent) # Also extract from main sentence for pattern in [r'\.\s*(?:no|not|without|don\'?t)\s+[^.]*']: for m in re.finditer(pattern, main_sentence, re.IGNORECASE): exclusion_parts.append(m.group().strip().lstrip('. ')) main_sentence = re.sub(pattern, '', main_sentence, flags=re.IGNORECASE) # Extract inline modifiers from main: "light sauce", "with cheese" for pattern in [r'\b(?:just|only|with|extra|light|heavy)\s+[\w,\s]+$']: m = re.search(pattern, main_sentence, re.IGNORECASE) if m: modifier_parts.append(m.group().strip()) main_sentence = main_sentence[:m.start()].strip() if exclusion_parts: result['exclusions'] = '; '.join(exclusion_parts) if modifier_parts: result['modifiers'] = '; '.join(modifier_parts) # Extract brand: "from braums", "from mcdonalds" brand_match = re.search(r'\bfrom\s+(\w[\w\s\']*?)(?:\s*[.,]|\s*$)', main_sentence, re.IGNORECASE) if brand_match: result['brand'] = brand_match.group(1).strip() main_sentence = main_sentence[:brand_match.start()] + main_sentence[brand_match.end():] # Clean up main sentence main_sentence = re.sub(r'\s+', ' ', main_sentence).strip().rstrip('.,;') # Handle "N of them" pattern still in main sentence of_them = re.search(r'\.?\s*(\d+)\s+of\s+them\b', main_sentence, re.IGNORECASE) if of_them: result['quantity'] = float(of_them.group(1)) main_sentence = main_sentence[:of_them.start()].strip() # Now parse quantity from the cleaned food description parsed_qty = _parse_quantity_from_phrase(main_sentence) # Only override quantity from phrase parsing if we didn't already get it from "N of them"/"N small ones" if result['quantity'] == 1.0 or parsed_qty['quantity'] != 1.0: if parsed_qty['quantity'] != 1.0: result['quantity'] = parsed_qty['quantity'] result['unit'] = parsed_qty['unit'] result['food_description'] = parsed_qty['food_name'] # Strip filler: "a", "an", "some", "single" from start result['food_description'] = re.sub(r'^(?:a|an|some)\s+', '', result['food_description'], flags=re.IGNORECASE).strip() # Handle "single scoop X" -> qty=1, unit=scoop, food=X scoop_match = re.match(r'^(?:single\s+)?(?:scoop|scoops)\s+(?:of\s+)?(.+)$', result['food_description'], re.IGNORECASE) if scoop_match: result['unit'] = 'scoop' result['food_description'] = scoop_match.group(1).strip() # Also catch "scoop of X" in the food name scoop_match2 = re.match(r'^(?:scoop|scoops)\s+(?:of\s+)?(.+)$', result['food_description'], re.IGNORECASE) if scoop_match2: result['unit'] = 'scoop' result['food_description'] = scoop_match2.group(1).strip() # Remove "single" prefix if still there result['food_description'] = re.sub(r'^single\s+', '', result['food_description'], flags=re.IGNORECASE).strip() return result def _ai_split_items(phrase: str) -> list[str]: """Use AI to split a multi-food phrase into individual items. Returns a list of strings, each describing one food with its quantity. Falls back to returning the original phrase as a single item if AI unavailable. """ if not OPENAI_API_KEY: return [phrase] prompt = f"""Split this food description into individual food items. Each item should include its quantity. Input: "{phrase}" Rules: - Return a JSON array of strings - Each string is one food item with its quantity - Keep quantities attached to their food: "2 eggs" stays as "2 eggs", not "eggs" - "half an egg" → "0.5 egg" - "one porotta" → "1 porotta" - If it's clearly one dish (like "chicken fried rice" or "egg sandwich"), keep it as one item - Convert words to numbers: "one" → "1", "two" → "2", "half" → "0.5" - Return ONLY the JSON array, no other text Examples: - "2 eggs and toast" → ["2 eggs", "1 toast"] - "half egg and one porotta" → ["0.5 egg", "1 porotta"] - "1 porotta, 1 egg, 1 slice cheese" → ["1 porotta", "1 egg", "1 slice cheese"] - "chicken fried rice" → ["1 chicken fried rice"] - "egg sandwich with cheese" → ["1 egg sandwich with cheese"] - "2 rotis with dal and rice" → ["2 roti", "1 dal", "1 rice"]""" try: req_body = json.dumps({ "model": OPENAI_MODEL, "messages": [ {"role": "system", "content": "You split food descriptions into individual items. Return ONLY a JSON array of strings."}, {"role": "user", "content": prompt} ], "temperature": 0.1, }).encode('utf-8') req = urllib.request.Request( "https://api.openai.com/v1/chat/completions", data=req_body, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {OPENAI_API_KEY}', }, ) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read().decode('utf-8')) content = data['choices'][0]['message']['content'].strip() if content.startswith('```'): content = content.split('```')[1] if content.startswith('json'): content = content[4:] items = json.loads(content) if isinstance(items, list) and len(items) > 0: return [str(i).strip() for i in items if str(i).strip()] except Exception as e: logger.warning(f"AI split failed: {e}") return [phrase] # ─── AI nutrition estimation ───────────────────────────────────────────────── def _ai_estimate_nutrition(food_description: str, modifiers: str = None, exclusions: str = None, brand: str = None, quantity: float = 1, unit: str = 'serving') -> dict | None: """Use OpenAI to estimate nutrition for a food item. Returns: {food_name, calories_per_base, protein_per_base, carbs_per_base, fat_per_base, base_unit, serving_description, estimated_grams, confidence} """ if not OPENAI_API_KEY: return None # Build the prompt desc_parts = [food_description] if brand: desc_parts.append(f"from {brand}") if modifiers: desc_parts.append(f"({modifiers})") if exclusions: desc_parts.append(f"[{exclusions}]") full_description = ' '.join(desc_parts) prompt = f"""Estimate nutritional information for: {full_description} Quantity specified: {quantity} {unit} Return a JSON object with these fields: - food_name: The FULL name of what was actually eaten, including key additions/toppings. Title case. Include brand if relevant. Examples: - "yogurt with a sprinkle of honey" → "Yogurt With Honey" - "mince tacos no sour cream" → "Mince Tacos (No Sour Cream)" - "smash burger" → "Homemade Smash Burger" - "Bellwether Farms yogurt with honey" → "Bellwether Farms Yogurt With Honey" Do NOT strip additions that change nutrition (honey, cheese, sauce, toppings). DO strip quantities/measurements from the name. - display_name: Short version for reuse, without one-off modifiers. E.g. "Bellwether Farms Yogurt" (without the honey) - calories: Total calories for the ENTIRE specified quantity ({quantity} {unit}) - protein: Total grams of protein for the entire quantity - carbs: Total grams of carbohydrates for the entire quantity - fat: Total grams of fat for the entire quantity - per_serving_calories: Calories for ONE serving/piece - per_serving_protein: Protein for ONE serving/piece - per_serving_carbs: Carbs for ONE serving/piece - per_serving_fat: Fat for ONE serving/piece - base_unit: What one unit is — "piece", "scoop", "serving", "slice", etc. - serving_description: Human-readable serving label, e.g. "1 taco", "1 scoop", "1 small pie" - estimated_grams: Approximate grams per serving - confidence: "high", "medium", or "low" IMPORTANT: - Account for all modifiers, additions, and exclusions in the calorie/macro estimate - The food_name should reflect what was ACTUALLY eaten (with honey, without sour cream, etc.) - The display_name should be the reusable base food (without one-off additions) - If a brand is specified, use brand-specific nutrition if you know it - Be realistic about portion sizes - Return ONLY the JSON object, no other text""" try: req_body = json.dumps({ "model": OPENAI_MODEL, "messages": [ {"role": "system", "content": "You are a nutrition expert. Return accurate JSON nutrition estimates only."}, {"role": "user", "content": prompt} ], "temperature": 0.2, }).encode('utf-8') req = urllib.request.Request( "https://api.openai.com/v1/chat/completions", data=req_body, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {OPENAI_API_KEY}', }, ) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode('utf-8')) content = data['choices'][0]['message']['content'].strip() # Strip markdown code blocks if present if content.startswith('```'): content = content.split('```')[1] if content.startswith('json'): content = content[4:] result = json.loads(content) # Validate and normalize confidence_map = {'high': 0.85, 'medium': 0.65, 'low': 0.4} return { 'food_name': str(result.get('food_name', food_description)).strip(), 'display_name': str(result.get('display_name', result.get('food_name', food_description))).strip(), 'calories': float(result.get('calories', 0)), 'protein': float(result.get('protein', 0)), 'carbs': float(result.get('carbs', 0)), 'fat': float(result.get('fat', 0)), 'calories_per_base': float(result.get('per_serving_calories', result.get('calories', 0))), 'protein_per_base': float(result.get('per_serving_protein', result.get('protein', 0))), 'carbs_per_base': float(result.get('per_serving_carbs', result.get('carbs', 0))), 'fat_per_base': float(result.get('per_serving_fat', result.get('fat', 0))), 'base_unit': str(result.get('base_unit', 'serving')), 'serving_description': str(result.get('serving_description', f'1 {unit}')), 'estimated_grams': float(result.get('estimated_grams', 0)) if result.get('estimated_grams') else None, 'confidence': confidence_map.get(result.get('confidence', 'medium'), 0.65), 'source': 'ai', } except Exception as e: logger.warning(f"AI estimation failed: {e}") return None def _parse_quantity_from_phrase(phrase: str) -> dict: """Extract quantity, unit, and clean food name from a raw phrase. Examples: '2 cups rice' -> {quantity: 2, unit: 'cup', food_name: 'rice'} '3 chicken breasts' -> {quantity: 3, unit: 'piece', food_name: 'chicken breasts'} 'small bowl biryani' -> {quantity: 1, unit: 'small bowl', food_name: 'biryani'} 'chicken breast' -> {quantity: 1, unit: 'serving', food_name: 'chicken breast'} """ phrase = phrase.strip() # Household portions household = { 'small bowl': 'small bowl', 'medium bowl': 'medium bowl', 'large bowl': 'large bowl', 'small plate': 'small plate', 'medium plate': 'medium plate', 'full plate': 'full plate', 'half plate': 'half plate', 'handful': 'handful', 'bite': 'bite', } for pattern, unit in household.items(): match = re.match(rf'^(?:(\d+\.?\d*)\s+)?{re.escape(pattern)}\s+(?:of\s+)?(.+)$', phrase, re.IGNORECASE) if match: return {'quantity': float(match.group(1) or 1), 'unit': unit, 'food_name': match.group(2).strip()} # Numeric + unit patterns: "2 cups rice", "4oz chicken", "100g rice" unit_map = { 'cups?': 'cup', 'tbsps?|tablespoons?': 'tbsp', 'tsps?|teaspoons?': 'tsp', 'oz|ounces?': 'oz', 'g|grams?': 'g', 'pieces?': 'piece', 'slices?': 'slice', 'servings?': 'serving', } for pattern, unit in unit_map.items(): match = re.match(rf'^(\d+\.?\d*)\s*(?:{pattern})\s+(?:of\s+)?(.+)$', phrase, re.IGNORECASE) if match: return {'quantity': float(match.group(1)), 'unit': unit, 'food_name': match.group(2).strip()} # Just a number prefix: "2 chicken breasts", "3 eggs" match = re.match(r'^(\d+\.?\d*)\s+(.+)$', phrase) if match: qty = float(match.group(1)) if qty <= 50: # Reasonable food quantity return {'quantity': qty, 'unit': 'piece', 'food_name': match.group(2).strip()} return {'quantity': 1, 'unit': 'serving', 'food_name': phrase} # Confidence thresholds (conservative: prefer false negatives over wrong auto-matches) THRESHOLD_AUTO_MATCH = 0.9 THRESHOLD_CONFIRM = 0.65 def resolve_food(raw_phrase: str, user_id: str, meal_type: str = None, portion_text: str = None, entry_date: str = None, source: str = 'api') -> dict: """ Smart food resolution endpoint. Resolution chain: quick-add check → parse NL → local DB → AI estimation → queue Stable response shape: { resolution_type: matched | confirm | queued | quick_add | ai_estimated confidence: float matched_food: {...} | null candidate_foods: [...] | [] ai_estimate: {...} | null parsed: {quantity, unit, food_description, meal_type, brand, modifiers, exclusions} raw_text: str queue_id: str | null reason: str } """ # Step 0: Parse natural language parsed = parse_food_request(raw_phrase) # Use explicitly provided meal_type if given, otherwise use parsed effective_meal = meal_type or parsed['meal_type'] base_response = { 'resolution_type': None, 'confidence': 0.0, 'matched_food': None, 'candidate_foods': [], 'ai_estimate': None, 'parsed': { 'quantity': parsed['quantity'], 'unit': parsed['unit'], 'food_description': parsed['food_description'], 'meal_type': effective_meal, 'brand': parsed['brand'], 'modifiers': parsed['modifiers'], 'exclusions': parsed['exclusions'], }, 'raw_text': raw_phrase, 'queue_id': None, 'reason': None, } # Step 1: Quick-add check ("450 calories", "300cal") quick_add_match = re.match( r'^(?:(?:log|add|track)\s+)?(\d+\.?\d*)\s*(?:cal(?:ories)?|kcal)(?:\s+.*)?$', raw_phrase.strip(), re.IGNORECASE ) if quick_add_match: calories = float(quick_add_match.group(1)) return {**base_response, 'resolution_type': 'quick_add', 'confidence': 1.0, 'parsed': {**base_response['parsed'], 'quantity': calories, 'unit': 'kcal'}, 'reason': 'Detected quick-add calorie pattern', } # Step 2: Search local DB food_name = parsed['food_description'] candidates = search_foods(food_name, user_id, limit=5) # Step 2b: Retry with singularized name if no strong match best_local_score = candidates[0]['score'] if candidates else 0 if best_local_score < THRESHOLD_AUTO_MATCH: alt_name = _naive_singularize(food_name) if alt_name.lower() != food_name.lower(): alt_candidates = search_foods(alt_name, user_id, limit=5) seen_ids = {c['id'] for c in candidates} for ac in alt_candidates: if ac['id'] not in seen_ids: candidates.append(ac) seen_ids.add(ac['id']) candidates.sort(key=lambda c: c['score'], reverse=True) best_local_score = candidates[0]['score'] if candidates else 0 # Build note from modifiers/exclusions (used for all match types) mod_note_parts = [] if parsed['modifiers']: mod_note_parts.append(parsed['modifiers']) if parsed['exclusions']: mod_note_parts.append(parsed['exclusions']) mod_note = '; '.join(mod_note_parts) if mod_note_parts else None # Build snapshot name override: food name + modifiers if present snapshot_override = None if mod_note_parts and candidates: base_name = candidates[0]['name'] snapshot_override = f"{base_name} ({', '.join(mod_note_parts)})" if best_local_score >= THRESHOLD_AUTO_MATCH: return {**base_response, 'resolution_type': 'matched', 'confidence': best_local_score, 'matched_food': candidates[0], 'candidate_foods': candidates[:3], 'snapshot_name_override': snapshot_override, 'note': mod_note, 'reason': f'High confidence local match ({best_local_score:.2f}) via {candidates[0]["match_type"]}', } # Step 3: AI estimation (if OpenAI is configured and we don't have a strong local match) ai_estimate = None if best_local_score < THRESHOLD_CONFIRM: ai_estimate = _ai_estimate_nutrition( food_description=parsed['food_description'], modifiers=parsed['modifiers'], exclusions=parsed['exclusions'], brand=parsed['brand'], quantity=parsed['quantity'], unit=parsed['unit'], ) if ai_estimate: base_response['ai_estimate'] = ai_estimate # Step 5: Decide resolution type based on what we found # Local confirm-level match exists — auto-match it (no queue) if best_local_score >= THRESHOLD_CONFIRM: return {**base_response, 'resolution_type': 'matched', 'confidence': best_local_score, 'matched_food': candidates[0], 'candidate_foods': candidates[:3], 'snapshot_name_override': snapshot_override, 'note': mod_note, 'reason': f'Auto-matched local food ({best_local_score:.2f}) via {candidates[0]["match_type"]}', } # AI estimated successfully — check for existing canonical food before creating if ai_estimate: ai_display_name = ai_estimate.get('display_name', ai_estimate['food_name']) # Pre-creation dedup: search for the AI's canonical name in local DB dedup_candidates = search_foods(ai_display_name, user_id, limit=3) # Also try singularized form alt_dedup = _naive_singularize(ai_display_name) if alt_dedup.lower() != ai_display_name.lower(): for ac in search_foods(alt_dedup, user_id, limit=3): if not any(c['id'] == ac['id'] for c in dedup_candidates): dedup_candidates.append(ac) dedup_candidates.sort(key=lambda c: c['score'], reverse=True) existing_match = None for dc in dedup_candidates: if dc['score'] >= THRESHOLD_CONFIRM: existing_match = dc break if existing_match: # Reuse existing canonical food — no new food created logger.info(f"Dedup: reusing existing '{existing_match['name']}' for AI input '{ai_display_name}'") matched = { 'id': existing_match['id'], 'name': existing_match['name'], 'brand': existing_match.get('brand'), 'base_unit': existing_match.get('base_unit', 'serving'), 'calories_per_base': existing_match.get('calories_per_base', 0), 'protein_per_base': existing_match.get('protein_per_base', 0), 'carbs_per_base': existing_match.get('carbs_per_base', 0), 'fat_per_base': existing_match.get('fat_per_base', 0), 'status': existing_match.get('status', 'confirmed'), 'servings': existing_match.get('servings', []), 'score': existing_match['score'], 'match_type': 'ai_dedup_matched', } else: # No existing match — create new canonical food new_food = create_food({ 'name': ai_display_name, 'brand': parsed['brand'], 'calories_per_base': ai_estimate['calories_per_base'], 'protein_per_base': ai_estimate['protein_per_base'], 'carbs_per_base': ai_estimate['carbs_per_base'], 'fat_per_base': ai_estimate['fat_per_base'], 'base_unit': ai_estimate['base_unit'], 'status': 'ai_created', 'notes': f"AI estimated from: {raw_phrase}", 'servings': [{ 'name': ai_estimate['serving_description'], 'amount_in_base': 1.0, 'is_default': True, }], }, user_id) matched = { 'id': new_food['id'], 'name': new_food['name'], 'brand': new_food.get('brand'), 'base_unit': new_food.get('base_unit', 'serving'), 'calories_per_base': new_food.get('calories_per_base', 0), 'protein_per_base': new_food.get('protein_per_base', 0), 'carbs_per_base': new_food.get('carbs_per_base', 0), 'fat_per_base': new_food.get('fat_per_base', 0), 'status': 'ai_created', 'servings': new_food.get('servings', []), 'score': ai_estimate['confidence'], 'match_type': 'ai_created', } # Build note from modifiers/exclusions note_parts = [] if parsed['modifiers']: note_parts.append(parsed['modifiers']) if parsed['exclusions']: note_parts.append(parsed['exclusions']) return {**base_response, 'resolution_type': 'ai_estimated', 'confidence': ai_estimate['confidence'], 'matched_food': matched, 'snapshot_name_override': ai_estimate['food_name'], # Full name with modifiers for the entry 'note': '; '.join(note_parts) if note_parts else None, 'reason': f'AI estimated nutrition for "{ai_estimate["food_name"]}" (confidence: {ai_estimate["confidence"]:.2f})', } # Nothing found — return as quick_add so the entry can still be created # The gateway/frontend will handle it as a basic calorie entry return {**base_response, 'resolution_type': 'quick_add', 'confidence': 0.0, 'candidate_foods': candidates[:3], 'reason': 'No match found — use as quick add entry', } def _create_resolution_queue_entry(user_id, raw_text, candidates, confidence, meal_type=None, entry_date=None, source=None, proposed_food_id=None) -> str: """Create an entry in the food resolution queue.""" queue_id = str(uuid.uuid4()) conn = get_db() conn.execute( """INSERT INTO food_resolution_queue (id, user_id, raw_text, proposed_food_id, candidates_json, confidence, meal_type, entry_date, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (queue_id, user_id, raw_text, proposed_food_id, json.dumps([{'food_id': c['id'], 'name': c['name'], 'score': c['score']} for c in candidates]), confidence, meal_type, entry_date, source) ) conn.commit() conn.close() return queue_id # ─── Food CRUD helpers ────────────────────────────────────────────────────── def get_food_servings(food_id: str) -> list: """Get all serving definitions for a food.""" conn = get_db() rows = conn.execute( "SELECT * FROM food_servings WHERE food_id = ? ORDER BY is_default DESC, name", (food_id,) ).fetchall() conn.close() return [dict(r) for r in rows] def create_food(data: dict, user_id: str) -> dict: """Create a new food with optional servings and aliases.""" food_id = str(uuid.uuid4()) name = data.get('name', '').strip() normalized = normalize_food_name(name) brand = data.get('brand') brand_norm = normalize_food_name(brand) if brand else None conn = get_db() conn.execute( """INSERT INTO foods (id, name, normalized_name, brand, brand_normalized, barcode, notes, calories_per_base, protein_per_base, carbs_per_base, fat_per_base, base_unit, status, created_by_user_id, is_shared) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (food_id, name, normalized, brand, brand_norm, data.get('barcode'), data.get('notes'), data.get('calories_per_base', 0), data.get('protein_per_base', 0), data.get('carbs_per_base', 0), data.get('fat_per_base', 0), data.get('base_unit', '100g'), data.get('status', 'confirmed'), user_id, 1 if data.get('is_shared', True) else 0) ) # Create default serving if provided servings = data.get('servings', []) if not servings: # Auto-create a default "1 serving" entry servings = [{'name': f'1 {data.get("base_unit", "serving")}', 'amount_in_base': 1.0, 'is_default': True}] for s in servings: conn.execute( "INSERT INTO food_servings (id, food_id, name, amount_in_base, is_default) VALUES (?, ?, ?, ?, ?)", (str(uuid.uuid4()), food_id, s.get('name', '1 serving'), s.get('amount_in_base', 1.0), 1 if s.get('is_default') else 0) ) # Create aliases from name tokens _auto_create_aliases(conn, food_id, name) conn.commit() conn.close() # Auto-fetch image in background auto_fetch_food_image(food_id, name, data.get('brand')) return get_food_by_id(food_id) def _auto_create_aliases(conn, food_id: str, name: str): """Auto-create aliases from the food name (e.g., 'Grilled Chicken Breast' -> ['chicken breast', 'grilled chicken']).""" normalized = normalize_food_name(name) tokens = normalized.split() aliases_to_add = set() aliases_to_add.add(normalized) # Add subsets of 2+ consecutive tokens if len(tokens) >= 2: for i in range(len(tokens)): for j in range(i + 2, len(tokens) + 1): subset = ' '.join(tokens[i:j]) if len(subset) >= 3: aliases_to_add.add(subset) for alias in aliases_to_add: try: conn.execute( "INSERT OR IGNORE INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), food_id, alias, alias) ) except sqlite3.IntegrityError: pass # Alias already exists for another food def get_food_by_id(food_id: str) -> dict | None: """Get a food by ID with its servings and aliases.""" conn = get_db() row = conn.execute("SELECT * FROM foods WHERE id = ?", (food_id,)).fetchone() if not row: conn.close() return None food = dict(row) food['servings'] = [dict(r) for r in conn.execute( "SELECT * FROM food_servings WHERE food_id = ? ORDER BY is_default DESC", (food_id,) ).fetchall()] food['aliases'] = [dict(r) for r in conn.execute( "SELECT * FROM food_aliases WHERE food_id = ?", (food_id,) ).fetchall()] conn.close() return food def _audit_log(user_id: str, action: str, entity_type: str, entity_id: str, details: dict = None): """Write an audit log entry.""" conn = get_db() conn.execute( "INSERT INTO audit_log (id, user_id, action, entity_type, entity_id, details) VALUES (?, ?, ?, ?, ?, ?)", (str(uuid.uuid4()), user_id, action, entity_type, entity_id, json.dumps(details) if details else None) ) conn.commit() conn.close() def merge_foods(source_id: str, target_id: str, user_id: str = None) -> dict: """Merge source food into target: move aliases, repoint entries, archive source.""" conn = get_db() source = conn.execute("SELECT * FROM foods WHERE id = ?", (source_id,)).fetchone() target = conn.execute("SELECT * FROM foods WHERE id = ?", (target_id,)).fetchone() if not source or not target: conn.close() return {'error': 'Source or target food not found'} # Move aliases from source to target (skip duplicates) source_aliases = conn.execute("SELECT * FROM food_aliases WHERE food_id = ?", (source_id,)).fetchall() for alias in source_aliases: try: conn.execute( "UPDATE food_aliases SET food_id = ? WHERE id = ?", (target_id, alias['id']) ) except sqlite3.IntegrityError: conn.execute("DELETE FROM food_aliases WHERE id = ?", (alias['id'],)) # Add source name as alias on target try: conn.execute( "INSERT OR IGNORE INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), target_id, source['name'], normalize_food_name(source['name'])) ) except sqlite3.IntegrityError: pass # Move food_entries references (snapshots are immutable, just update food_id reference) conn.execute("UPDATE food_entries SET food_id = ? WHERE food_id = ?", (target_id, source_id)) # Move template items conn.execute("UPDATE meal_template_items SET food_id = ? WHERE food_id = ?", (target_id, source_id)) # Move favorites conn.execute("DELETE FROM user_favorites WHERE food_id = ?", (source_id,)) # Move servings (skip duplicates by name) source_servings = conn.execute("SELECT * FROM food_servings WHERE food_id = ?", (source_id,)).fetchall() for serving in source_servings: existing = conn.execute( "SELECT id FROM food_servings WHERE food_id = ? AND name = ?", (target_id, serving['name']) ).fetchone() if not existing: conn.execute( "UPDATE food_servings SET food_id = ? WHERE id = ?", (target_id, serving['id']) ) else: conn.execute("DELETE FROM food_servings WHERE id = ?", (serving['id'],)) # Archive source conn.execute("UPDATE foods SET status = 'archived' WHERE id = ?", (source_id,)) conn.commit() conn.close() # Audit trail _audit_log(user_id, 'food_merged', 'food', target_id, { 'source_id': source_id, 'source_name': source['name'], 'target_id': target_id, 'target_name': target['name'], }) return {'success': True, 'source_id': source_id, 'target_id': target_id} # ─── Entry helpers ─────────────────────────────────────────────────────────── def calculate_entry_nutrition(food: dict, quantity: float, serving_id: str = None) -> dict: """Calculate nutrition for a food entry based on quantity and serving.""" base_calories = food.get('calories_per_base', 0) base_protein = food.get('protein_per_base', 0) base_carbs = food.get('carbs_per_base', 0) base_fat = food.get('fat_per_base', 0) # If a specific serving is selected, multiply by its base amount multiplier = quantity if serving_id: conn = get_db() serving = conn.execute("SELECT * FROM food_servings WHERE id = ?", (serving_id,)).fetchone() conn.close() if serving: multiplier = quantity * serving['amount_in_base'] return { 'calories': round(base_calories * multiplier, 1), 'protein': round(base_protein * multiplier, 1), 'carbs': round(base_carbs * multiplier, 1), 'fat': round(base_fat * multiplier, 1), } def create_food_entry(data: dict, user_id: str) -> dict: """Create a food log entry with immutable nutrition snapshot.""" # Idempotency check: if key provided and already used, return existing entry idempotency_key = data.get('idempotency_key') if idempotency_key: conn = get_db() existing = conn.execute( "SELECT * FROM food_entries WHERE idempotency_key = ?", (idempotency_key,) ).fetchone() conn.close() if existing: return dict(existing) entry_id = str(uuid.uuid4()) entry_type = data.get('entry_type', 'food') food_id = data.get('food_id') meal_type = data.get('meal_type', 'snack').lower() entry_date = data.get('entry_date', date.today().isoformat()) quantity = data.get('quantity', 1.0) unit = data.get('unit', 'serving') serving_description = data.get('serving_description') source = data.get('source', 'web') entry_method = data.get('entry_method', 'manual') image_ref = data.get('image_ref') snapshot_serving_label = None snapshot_grams = data.get('snapshot_grams') # Quick-add: just calories, no food reference if entry_type == 'quick_add': snapshot_name = data.get('snapshot_food_name', 'Quick add') snapshot_cals = data.get('snapshot_calories', 0) snapshot_protein = data.get('snapshot_protein', 0) snapshot_carbs = data.get('snapshot_carbs', 0) snapshot_fat = data.get('snapshot_fat', 0) entry_method = 'quick_add' else: # Get food and calculate nutrition snapshot food = get_food_by_id(food_id) if not food: return {'error': 'Food not found'} serving_id = data.get('serving_id') nutrition = calculate_entry_nutrition(food, quantity, serving_id) snapshot_name = data.get('snapshot_food_name_override') or food['name'] snapshot_cals = nutrition['calories'] snapshot_protein = nutrition['protein'] snapshot_carbs = nutrition['carbs'] snapshot_fat = nutrition['fat'] # Resolve serving label and grams for snapshot if serving_id: for s in food.get('servings', []): if s['id'] == serving_id: snapshot_serving_label = s['name'] if not serving_description: serving_description = f"{quantity} x {s['name']}" # Calculate grams if base_unit is 100g if food.get('base_unit') == '100g': snapshot_grams = round(quantity * s['amount_in_base'] * 100, 1) break elif food.get('base_unit') == '100g': snapshot_grams = round(quantity * 100, 1) conn = get_db() conn.execute( """INSERT INTO food_entries (id, user_id, food_id, meal_type, entry_date, entry_type, quantity, unit, serving_description, snapshot_food_name, snapshot_serving_label, snapshot_grams, snapshot_calories, snapshot_protein, snapshot_carbs, snapshot_fat, source, entry_method, raw_text, confidence_score, note, image_ref, ai_metadata, idempotency_key) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (entry_id, user_id, food_id, meal_type, entry_date, entry_type, quantity, unit, serving_description, snapshot_name, snapshot_serving_label, snapshot_grams, snapshot_cals, snapshot_protein, snapshot_carbs, snapshot_fat, source, entry_method, data.get('raw_text'), data.get('confidence_score'), data.get('note'), image_ref, json.dumps(data.get('ai_metadata')) if data.get('ai_metadata') else None, idempotency_key) ) conn.commit() conn.close() return { 'id': entry_id, 'food_id': food_id, 'meal_type': meal_type, 'entry_date': entry_date, 'entry_type': entry_type, 'quantity': quantity, 'unit': unit, 'serving_description': serving_description, 'snapshot_food_name': snapshot_name, 'snapshot_serving_label': snapshot_serving_label, 'snapshot_grams': snapshot_grams, 'snapshot_calories': snapshot_cals, 'snapshot_protein': snapshot_protein, 'snapshot_carbs': snapshot_carbs, 'snapshot_fat': snapshot_fat, 'source': source, 'entry_method': entry_method, } def get_entries_by_date(user_id: str, entry_date: str) -> list: """Get all food entries for a user on a specific date, with food image.""" conn = get_db() rows = conn.execute( """SELECT fe.*, f.image_path as food_image_path FROM food_entries fe LEFT JOIN foods f ON fe.food_id = f.id WHERE fe.user_id = ? AND fe.entry_date = ? ORDER BY CASE fe.meal_type WHEN 'breakfast' THEN 1 WHEN 'lunch' THEN 2 WHEN 'dinner' THEN 3 WHEN 'snack' THEN 4 ELSE 5 END, fe.created_at""", (user_id, entry_date) ).fetchall() conn.close() return [dict(r) for r in rows] def get_daily_totals(user_id: str, entry_date: str) -> dict: """Get daily nutrition totals.""" conn = get_db() row = conn.execute( """SELECT COALESCE(SUM(snapshot_calories), 0) as total_calories, COALESCE(SUM(snapshot_protein), 0) as total_protein, COALESCE(SUM(snapshot_carbs), 0) as total_carbs, COALESCE(SUM(snapshot_fat), 0) as total_fat, COUNT(*) as entry_count FROM food_entries WHERE user_id = ? AND entry_date = ?""", (user_id, entry_date) ).fetchone() conn.close() return dict(row) def get_goals_for_date(user_id: str, for_date: str) -> dict | None: """Get the goal for a user on a specific date. Uses date ranges, not is_active flag — historical goals remain queryable.""" conn = get_db() row = conn.execute( """SELECT * FROM goals WHERE user_id = ? AND start_date <= ? AND (end_date IS NULL OR end_date >= ?) ORDER BY start_date DESC LIMIT 1""", (user_id, for_date, for_date) ).fetchone() conn.close() if row: return dict(row) return None def get_recent_foods(user_id: str, limit: int = 20) -> list: """Get recently logged foods for a user (deduplicated).""" conn = get_db() rows = conn.execute( """SELECT DISTINCT fe.food_id, fe.snapshot_food_name, f.calories_per_base, f.protein_per_base, f.carbs_per_base, f.fat_per_base, f.base_unit, MAX(fe.created_at) as last_used FROM food_entries fe JOIN foods f ON fe.food_id = f.id WHERE fe.user_id = ? AND fe.food_id IS NOT NULL AND f.status != 'archived' GROUP BY fe.food_id ORDER BY last_used DESC LIMIT ?""", (user_id, limit) ).fetchall() conn.close() return [dict(r) for r in rows] def get_frequent_foods(user_id: str, limit: int = 20) -> list: """Get most frequently logged foods for a user.""" conn = get_db() rows = conn.execute( """SELECT fe.food_id, fe.snapshot_food_name, f.calories_per_base, f.protein_per_base, f.carbs_per_base, f.fat_per_base, f.base_unit, COUNT(*) as use_count, MAX(fe.created_at) as last_used FROM food_entries fe JOIN foods f ON fe.food_id = f.id WHERE fe.user_id = ? AND fe.food_id IS NOT NULL AND f.status != 'archived' GROUP BY fe.food_id ORDER BY use_count DESC LIMIT ?""", (user_id, limit) ).fetchall() conn.close() return [dict(r) for r in rows] # ─── HTTP Handler ──────────────────────────────────────────────────────────── class CalorieHandler(BaseHTTPRequestHandler): """HTTP request handler for the Calorie Tracker API.""" def _get_user(self) -> dict | None: """Authenticate the request. Supports session cookie, Bearer token, or API key + telegram_id.""" # Check session cookie cookie = SimpleCookie(self.headers.get('Cookie', '')) if 'session' in cookie: user = get_user_from_session(cookie['session'].value) if user: return user # Check Authorization header auth = self.headers.get('Authorization', '') if auth.startswith('Bearer '): token = auth[7:] user = get_user_from_session(token) if user: return user # Check API key + telegram_user_id (service-to-service) api_key = self.headers.get('X-API-Key', '') telegram_id = self.headers.get('X-Telegram-User-Id', '') if api_key and telegram_id: user = get_user_from_api_key_and_telegram(api_key, telegram_id) if user: return user return None def _send_json(self, data, status=200): """Send a JSON response.""" body = json.dumps(data, default=str).encode('utf-8') self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', len(body)) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key, X-Telegram-User-Id') self.send_header('Access-Control-Allow-Methods', 'GET, POST, PATCH, PUT, DELETE, OPTIONS') self.end_headers() self.wfile.write(body) def _read_body(self) -> dict: """Read and parse JSON request body.""" length = int(self.headers.get('Content-Length', 0)) if length == 0: return {} body = self.rfile.read(length) return json.loads(body) def _require_auth(self) -> dict | None: """Require authentication; send 401 if not authenticated.""" user = self._get_user() if not user: self._send_json({'error': 'Unauthorized'}, 401) return None return user def do_OPTIONS(self): """Handle CORS preflight.""" self.send_response(204) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key, X-Telegram-User-Id') self.send_header('Access-Control-Allow-Methods', 'GET, POST, PATCH, PUT, DELETE, OPTIONS') self.end_headers() def do_GET(self): parsed = urlparse(self.path) path = parsed.path.rstrip('/') params = parse_qs(parsed.query) # ── Public routes ── if path == '/api/health': return self._send_json({'status': 'ok'}) # ── Serve food images ── import mimetypes if path.startswith('/images/'): filename = path.split('/')[-1] filepath = IMAGES_DIR / filename if filepath.exists() and filepath.is_file(): mime = mimetypes.guess_type(str(filepath))[0] or 'image/jpeg' self.send_response(200) self.send_header('Content-Type', mime) self.send_header('Cache-Control', 'public, max-age=86400') data = filepath.read_bytes() self.send_header('Content-Length', len(data)) self.end_headers() self.wfile.write(data) return self.send_response(404) self.end_headers() return # ── Auth required routes ── user = self._require_auth() if not user: return # GET /api/user if path == '/api/user': return self._send_json(user) # GET /api/foods/search?q=... if path == '/api/foods/search': query = params.get('q', [''])[0] limit = int(params.get('limit', ['20'])[0]) if not query: return self._send_json([]) results = search_foods(query, user['id'], limit) return self._send_json(results) # GET /api/foods — list all foods if path == '/api/foods': limit = int(params.get('limit', ['100'])[0]) conn = get_db() rows = conn.execute( "SELECT * FROM foods WHERE status != 'archived' ORDER BY name COLLATE NOCASE LIMIT ?", (limit,) ).fetchall() conn.close() result = [] for r in rows: food = dict(r) food['servings'] = get_food_servings(food['id']) result.append(food) return self._send_json(result) # GET /api/foods/recent if path == '/api/foods/recent': limit = int(params.get('limit', ['20'])[0]) return self._send_json(get_recent_foods(user['id'], limit)) # GET /api/foods/frequent if path == '/api/foods/frequent': limit = int(params.get('limit', ['20'])[0]) return self._send_json(get_frequent_foods(user['id'], limit)) # GET /api/foods/external?q=... — search OpenFoodFacts + USDA if path == '/api/foods/external': query = params.get('q', [''])[0] limit = int(params.get('limit', ['10'])[0]) if not query: return self._send_json([]) results = search_external(query, limit) return self._send_json(results) # GET /api/foods/barcode/ — lookup by barcode barcode_match = re.match(r'^/api/foods/barcode/(\d+)$', path) if barcode_match: barcode = barcode_match.group(1) # Check local DB first conn = get_db() local = conn.execute("SELECT * FROM foods WHERE barcode = ? AND status != 'archived'", (barcode,)).fetchone() conn.close() if local: food = get_food_by_id(local['id']) return self._send_json({'source': 'local', 'food': food}) # Try OpenFoodFacts result = lookup_openfoodfacts_barcode(barcode) if result: return self._send_json({'source': 'openfoodfacts', 'external': result}) return self._send_json({'error': 'Barcode not found'}, 404) # GET /api/foods/ food_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path) if food_match: food = get_food_by_id(food_match.group(1)) if food: return self._send_json(food) return self._send_json({'error': 'Not found'}, 404) # GET /api/entries?date=...&user_id=... if path == '/api/entries': entry_date = params.get('date', [date.today().isoformat()])[0] target_user = params.get('user_id', [user['id']])[0] entries = get_entries_by_date(target_user, entry_date) return self._send_json(entries) # GET /api/entries/totals?date=... if path == '/api/entries/totals': entry_date = params.get('date', [date.today().isoformat()])[0] target_user = params.get('user_id', [user['id']])[0] totals = get_daily_totals(target_user, entry_date) return self._send_json(totals) # GET /api/goals/for-date?date=...&user_id=... if path == '/api/goals/for-date': for_date = params.get('date', [date.today().isoformat()])[0] target_user = params.get('user_id', [user['id']])[0] goal = get_goals_for_date(target_user, for_date) if goal: return self._send_json(goal) return self._send_json({'error': 'No active goal found'}, 404) # GET /api/goals?user_id=... if path == '/api/goals': target_user = params.get('user_id', [user['id']])[0] conn = get_db() rows = conn.execute( "SELECT * FROM goals WHERE user_id = ? ORDER BY start_date DESC", (target_user,) ).fetchall() conn.close() return self._send_json([dict(r) for r in rows]) # GET /api/templates if path == '/api/templates': conn = get_db() templates = conn.execute( "SELECT * FROM meal_templates WHERE user_id = ? AND is_archived = 0 ORDER BY updated_at DESC", (user['id'],) ).fetchall() result = [] for t in templates: t_dict = dict(t) items = conn.execute( "SELECT * FROM meal_template_items WHERE template_id = ?", (t['id'],) ).fetchall() t_dict['items'] = [dict(i) for i in items] result.append(t_dict) conn.close() return self._send_json(result) # GET /api/favorites if path == '/api/favorites': conn = get_db() rows = conn.execute( """SELECT f.* FROM user_favorites uf JOIN foods f ON uf.food_id = f.id WHERE uf.user_id = ? AND f.status != 'archived' ORDER BY uf.created_at DESC""", (user['id'],) ).fetchall() conn.close() return self._send_json([dict(r) for r in rows]) # GET /api/resolution-queue if path == '/api/resolution-queue': conn = get_db() rows = conn.execute( """SELECT * FROM food_resolution_queue WHERE user_id = ? AND resolved_at IS NULL ORDER BY created_at DESC""", (user['id'],) ).fetchall() conn.close() return self._send_json([dict(r) for r in rows]) # GET /api/users (list all users — for switching view) if path == '/api/users': conn = get_db() rows = conn.execute("SELECT id, username, display_name FROM users").fetchall() conn.close() return self._send_json([dict(r) for r in rows]) self._send_json({'error': 'Not found'}, 404) def do_POST(self): parsed = urlparse(self.path) path = parsed.path.rstrip('/') # ── Login (no auth required) ── if path == '/api/auth/login': data = self._read_body() username = data.get('username', '').strip().lower() password = data.get('password', '') password_hash = hashlib.sha256(password.encode()).hexdigest() conn = get_db() user = conn.execute( "SELECT * FROM users WHERE username = ? AND password_hash = ?", (username, password_hash) ).fetchone() conn.close() if not user: return self._send_json({'error': 'Invalid credentials'}, 401) token = create_session(user['id']) self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Set-Cookie', f'session={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age=2592000') self.send_header('Access-Control-Allow-Origin', '*') body = json.dumps({'token': token, 'user': { 'id': user['id'], 'username': user['username'], 'display_name': user['display_name'] }}).encode() self.send_header('Content-Length', len(body)) self.end_headers() self.wfile.write(body) return # ── Auth required routes ── user = self._require_auth() if not user: return # POST /api/foods if path == '/api/foods': data = self._read_body() if not data.get('name'): return self._send_json({'error': 'name is required'}, 400) food = create_food(data, user['id']) return self._send_json(food, 201) # POST /api/foods/split — AI-powered multi-item splitting if path == '/api/foods/split': data = self._read_body() phrase = data.get('phrase', '').strip() if not phrase: return self._send_json({'error': 'phrase is required'}, 400) items = _ai_split_items(phrase) return self._send_json({'items': items}) # POST /api/foods/resolve if path == '/api/foods/resolve': data = self._read_body() raw_phrase = data.get('raw_phrase', '').strip() if not raw_phrase: return self._send_json({'error': 'raw_phrase is required'}, 400) result = resolve_food( raw_phrase=raw_phrase, user_id=user['id'], meal_type=data.get('meal_type'), portion_text=data.get('portion_text'), entry_date=data.get('entry_date'), source=data.get('source', 'api'), ) return self._send_json(result) # POST /api/images/search — Google Image Search for food photos if path == '/api/images/search': data = self._read_body() query = data.get('query', '') if not query: return self._send_json({'error': 'query required'}, 400) images = search_google_images(query, num=int(data.get('num', 6))) return self._send_json({'images': images}) # POST /api/foods//image — download and set image from URL image_set_match = re.match(r'^/api/foods/([a-f0-9-]+)/image$', path) if image_set_match: food_id = image_set_match.group(1) data = self._read_body() image_url = data.get('url') if not image_url: return self._send_json({'error': 'url required'}, 400) filename = download_food_image(image_url, food_id) if filename: return self._send_json({'success': True, 'image_path': filename}) return self._send_json({'success': False, 'error': 'Failed to download image'}) # POST /api/foods/import — import from external nutrition database result if path == '/api/foods/import': data = self._read_body() if not data.get('name') or not data.get('calories_per_100g'): return self._send_json({'error': 'name and calories_per_100g required'}, 400) food = import_external_food(data, user['id']) _audit_log(user['id'], 'food_created', 'food', food['id'], { 'source': data.get('source', 'external'), 'name': data['name'], 'barcode': data.get('barcode'), }) return self._send_json(food, 201) # POST /api/foods/merge if path == '/api/foods/merge': data = self._read_body() source_id = data.get('source_id') target_id = data.get('target_id') if not source_id or not target_id: return self._send_json({'error': 'source_id and target_id required'}, 400) result = merge_foods(source_id, target_id, user['id']) if 'error' in result: return self._send_json(result, 400) return self._send_json(result) # POST /api/entries if path == '/api/entries': data = self._read_body() if data.get('entry_type') != 'quick_add' and not data.get('food_id'): return self._send_json({'error': 'food_id is required (or use entry_type=quick_add)'}, 400) entry = create_food_entry(data, user['id']) if 'error' in entry: return self._send_json(entry, 400) return self._send_json(entry, 201) # POST /api/favorites if path == '/api/favorites': data = self._read_body() food_id = data.get('food_id') if not food_id: return self._send_json({'error': 'food_id required'}, 400) conn = get_db() try: conn.execute("INSERT INTO user_favorites (user_id, food_id) VALUES (?, ?)", (user['id'], food_id)) conn.commit() except sqlite3.IntegrityError: pass # Already a favorite conn.close() return self._send_json({'success': True}) # POST /api/templates if path == '/api/templates': data = self._read_body() template_id = str(uuid.uuid4()) conn = get_db() conn.execute( "INSERT INTO meal_templates (id, user_id, name, meal_type, is_favorite) VALUES (?, ?, ?, ?, ?)", (template_id, user['id'], data.get('name', 'Untitled'), data.get('meal_type'), 1 if data.get('is_favorite') else 0) ) for item in data.get('items', []): food = get_food_by_id(item['food_id']) if food: nutrition = calculate_entry_nutrition(food, item.get('quantity', 1), item.get('serving_id')) conn.execute( """INSERT INTO meal_template_items (id, template_id, food_id, quantity, unit, serving_description, snapshot_food_name, snapshot_calories, snapshot_protein, snapshot_carbs, snapshot_fat) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (str(uuid.uuid4()), template_id, item['food_id'], item.get('quantity', 1), item.get('unit', 'serving'), item.get('serving_description'), food['name'], nutrition['calories'], nutrition['protein'], nutrition['carbs'], nutrition['fat']) ) conn.commit() conn.close() return self._send_json({'id': template_id, 'name': data.get('name')}, 201) # POST /api/templates//log template_log_match = re.match(r'^/api/templates/([a-f0-9-]+)/log$', path) if template_log_match: template_id = template_log_match.group(1) data = self._read_body() meal_type = data.get('meal_type', 'snack') entry_date = data.get('entry_date', date.today().isoformat()) conn = get_db() items = conn.execute( "SELECT * FROM meal_template_items WHERE template_id = ?", (template_id,) ).fetchall() conn.close() if not items: return self._send_json({'error': 'Template not found or empty'}, 404) results = [] for item in items: entry = create_food_entry({ 'food_id': item['food_id'], 'meal_type': meal_type, 'entry_date': entry_date, 'quantity': item['quantity'], 'unit': item['unit'], 'serving_description': item['serving_description'], 'source': 'template', }, user['id']) results.append(entry) return self._send_json({'logged': len(results), 'entries': results}) # POST /api/resolution-queue//resolve resolve_match = re.match(r'^/api/resolution-queue/([a-f0-9-]+)/resolve$', path) if resolve_match: queue_id = resolve_match.group(1) data = self._read_body() action = data.get('action') # matched, created, dismissed food_id = data.get('food_id') conn = get_db() queue_item = conn.execute( "SELECT * FROM food_resolution_queue WHERE id = ?", (queue_id,) ).fetchone() if not queue_item: conn.close() return self._send_json({'error': 'Queue item not found'}, 404) conn.execute( """UPDATE food_resolution_queue SET resolved_food_id = ?, resolved_at = ?, resolution_action = ? WHERE id = ?""", (food_id, datetime.now().isoformat(), action, queue_id) ) result = {'success': True, 'action': action} # If matched or created, also create the food entry if action in ('matched', 'created') and food_id: entry = create_food_entry({ 'food_id': food_id, 'meal_type': queue_item['meal_type'] or 'snack', 'entry_date': queue_item['entry_date'] or date.today().isoformat(), 'quantity': queue_item['quantity'] or 1.0, 'unit': queue_item['unit'] or 'serving', 'source': queue_item['source'] or 'web', 'entry_method': 'search', 'raw_text': queue_item['raw_text'], 'confidence_score': queue_item['confidence'], }, user['id']) result['entry'] = entry conn.commit() conn.close() # Audit trail _audit_log(user['id'], 'queue_resolved', 'queue', queue_id, { 'action': action, 'food_id': food_id, 'raw_text': queue_item['raw_text'], 'confidence': queue_item['confidence'], }) return self._send_json(result) # POST /api/foods//aliases alias_match = re.match(r'^/api/foods/([a-f0-9-]+)/aliases$', path) if alias_match: food_id = alias_match.group(1) data = self._read_body() alias = data.get('alias', '').strip() if not alias: return self._send_json({'error': 'alias is required'}, 400) alias_id = str(uuid.uuid4()) conn = get_db() try: conn.execute( "INSERT INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)", (alias_id, food_id, alias, normalize_food_name(alias)) ) conn.commit() except sqlite3.IntegrityError: conn.close() return self._send_json({'error': 'Alias already exists'}, 409) conn.close() return self._send_json({'id': alias_id, 'alias': alias}, 201) self._send_json({'error': 'Not found'}, 404) def do_PATCH(self): parsed = urlparse(self.path) path = parsed.path.rstrip('/') user = self._require_auth() if not user: return # PATCH /api/entries/ entry_match = re.match(r'^/api/entries/([a-f0-9-]+)$', path) if entry_match: entry_id = entry_match.group(1) data = self._read_body() conn = get_db() entry = conn.execute("SELECT * FROM food_entries WHERE id = ? AND user_id = ?", (entry_id, user['id'])).fetchone() if not entry: conn.close() return self._send_json({'error': 'Entry not found'}, 404) updates = [] params = [] # Allow updating these fields for field in ['meal_type', 'entry_date', 'quantity', 'unit', 'serving_description', 'note']: if field in data: updates.append(f"{field} = ?") params.append(data[field]) # If quantity changed and it's a food entry, recalculate snapshot if 'quantity' in data and entry['food_id']: food = get_food_by_id(entry['food_id']) if food: quantity = data['quantity'] nutrition = calculate_entry_nutrition(food, quantity) updates.extend([ 'snapshot_calories = ?', 'snapshot_protein = ?', 'snapshot_carbs = ?', 'snapshot_fat = ?' ]) params.extend([nutrition['calories'], nutrition['protein'], nutrition['carbs'], nutrition['fat']]) if updates: params.append(entry_id) conn.execute(f"UPDATE food_entries SET {', '.join(updates)} WHERE id = ?", params) conn.commit() updated = conn.execute("SELECT * FROM food_entries WHERE id = ?", (entry_id,)).fetchone() conn.close() return self._send_json(dict(updated)) # PATCH /api/foods/ food_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path) if food_match: food_id = food_match.group(1) data = self._read_body() conn = get_db() updates = [] params = [] for field in ['name', 'brand', 'barcode', 'notes', 'calories_per_base', 'protein_per_base', 'carbs_per_base', 'fat_per_base', 'base_unit', 'status', 'is_shared']: if field in data: if field == 'name': updates.append('name = ?') params.append(data['name']) updates.append('normalized_name = ?') params.append(normalize_food_name(data['name'])) elif field == 'brand': updates.append('brand = ?') params.append(data['brand']) updates.append('brand_normalized = ?') params.append(normalize_food_name(data['brand']) if data['brand'] else None) else: updates.append(f"{field} = ?") params.append(data[field]) updates.append("updated_at = ?") params.append(datetime.now().isoformat()) if updates: params.append(food_id) conn.execute(f"UPDATE foods SET {', '.join(updates)} WHERE id = ?", params) conn.commit() conn.close() return self._send_json(get_food_by_id(food_id)) self._send_json({'error': 'Not found'}, 404) def do_PUT(self): parsed = urlparse(self.path) path = parsed.path.rstrip('/') user = self._require_auth() if not user: return # PUT /api/goals if path == '/api/goals': data = self._read_body() target_user = data.get('user_id', user['id']) start_date = data.get('start_date', date.today().isoformat()) conn = get_db() # Deactivate/end previous active goals conn.execute( """UPDATE goals SET end_date = ?, is_active = 0 WHERE user_id = ? AND is_active = 1 AND end_date IS NULL""", (start_date, target_user) ) goal_id = str(uuid.uuid4()) conn.execute( """INSERT INTO goals (id, user_id, start_date, end_date, calories, protein, carbs, fat, is_active) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)""", (goal_id, target_user, start_date, data.get('end_date'), data.get('calories', 2000), data.get('protein', 150), data.get('carbs', 200), data.get('fat', 65)) ) conn.commit() conn.close() return self._send_json({'id': goal_id, 'start_date': start_date}) self._send_json({'error': 'Not found'}, 404) def do_DELETE(self): parsed = urlparse(self.path) path = parsed.path.rstrip('/') user = self._require_auth() if not user: return # DELETE /api/entries/ entry_match = re.match(r'^/api/entries/([a-f0-9-]+)$', path) if entry_match: entry_id = entry_match.group(1) conn = get_db() conn.execute("DELETE FROM food_entries WHERE id = ? AND user_id = ?", (entry_id, user['id'])) conn.commit() conn.close() return self._send_json({'success': True}) # DELETE /api/favorites/ fav_match = re.match(r'^/api/favorites/([a-f0-9-]+)$', path) if fav_match: food_id = fav_match.group(1) conn = get_db() conn.execute("DELETE FROM user_favorites WHERE user_id = ? AND food_id = ?", (user['id'], food_id)) conn.commit() conn.close() return self._send_json({'success': True}) # DELETE /api/templates/ (soft delete — archive) template_match = re.match(r'^/api/templates/([a-f0-9-]+)$', path) if template_match: template_id = template_match.group(1) conn = get_db() conn.execute("UPDATE meal_templates SET is_archived = 1 WHERE id = ? AND user_id = ?", (template_id, user['id'])) conn.commit() conn.close() return self._send_json({'success': True}) # DELETE /api/foods/ (soft delete — archive) food_del_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path) if food_del_match: food_id = food_del_match.group(1) conn = get_db() conn.execute("UPDATE foods SET status = 'archived', updated_at = ? WHERE id = ?", (datetime.now().isoformat(), food_id)) conn.commit() conn.close() _audit_log(user['id'], 'food_archived', 'food', food_id, {'action': 'deleted by user'}) return self._send_json({'success': True}) # DELETE /api/foods//aliases/ alias_del_match = re.match(r'^/api/foods/([a-f0-9-]+)/aliases/([a-f0-9-]+)$', path) if alias_del_match: alias_id = alias_del_match.group(2) conn = get_db() conn.execute("DELETE FROM food_aliases WHERE id = ?", (alias_id,)) conn.commit() conn.close() return self._send_json({'success': True}) self._send_json({'error': 'Not found'}, 404) def log_message(self, format, *args): """Suppress default logging for cleaner output.""" pass # ─── Main ──────────────────────────────────────────────────────────────────── if __name__ == '__main__': print(f"Initializing database at {DB_PATH}...") init_db() seed_default_users() print(f"Starting Calorie Tracker on port {PORT}...") server = HTTPServer(('0.0.0.0', PORT), CalorieHandler) try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down.") server.server_close()