Files
platform/services/fitness/server.py

2694 lines
110 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Calorie Tracker - Self-hosted calorie & macro tracker with SQLite backend
Replaces SparkyFitness with fuzzy food matching, confidence-based resolve, and AI intake support.
"""
import os
import json
import sqlite3
import uuid
import secrets
import bcrypt
import re
import unicodedata
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
from datetime import datetime, date, timedelta
from pathlib import Path
from urllib.parse import urlparse, parse_qs
from difflib import SequenceMatcher
import urllib.request
import urllib.error
import logging
logger = logging.getLogger(__name__)
# Configuration
PORT = int(os.environ.get("PORT", 8095))
DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))
DB_PATH = DATA_DIR / "calories.db"
IMAGES_DIR = DATA_DIR / "images"
API_KEY = os.environ.get("CALORIES_API_KEY", "") # For service-to-service (Telegram)
USDA_API_KEY = os.environ.get("USDA_API_KEY", "") # Free from https://fdc.nal.usda.gov/api-key-signup.html
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") # For AI nutrition estimation
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") # Fast + cheap for estimation
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") # For food image search
GOOGLE_CX = os.environ.get("GOOGLE_CX", "") # Google Custom Search engine ID
# Ensure directories exist
DATA_DIR.mkdir(parents=True, exist_ok=True)
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
# ─── Database ────────────────────────────────────────────────────────────────
def get_db():
"""Get database connection."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
def normalize_food_name(name: str) -> str:
"""Normalize a food name for matching: lowercase, strip, collapse whitespace, remove accents."""
if not name:
return ""
# Remove accents
name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode('ascii')
# Lowercase, strip, collapse whitespace
name = re.sub(r'\s+', ' ', name.lower().strip())
# Remove common filler words for matching
name = re.sub(r'\b(the|a|an|of|with|and|in)\b', '', name).strip()
name = re.sub(r'\s+', ' ', name)
return name
def _naive_singularize(name: str) -> str:
"""Strip common plural suffixes for search retry. Conservative — only trailing 's'."""
lower = name.lower().strip()
if lower.endswith('ies'):
return name[:-3] + 'y' # berries -> berry
if lower.endswith('s') and not lower.endswith('ss') and len(lower) > 3:
return name[:-1]
return name
def tokenize_food_name(name: str) -> set:
"""Break a normalized food name into tokens for overlap matching."""
return set(normalize_food_name(name).split())
def similarity_score(a: str, b: str) -> float:
"""Compute similarity between two food names (0.0 to 1.0)."""
norm_a = normalize_food_name(a)
norm_b = normalize_food_name(b)
# Exact normalized match
if norm_a == norm_b:
return 1.0
# Token overlap (Jaccard)
tokens_a = tokenize_food_name(a)
tokens_b = tokenize_food_name(b)
if tokens_a and tokens_b:
jaccard = len(tokens_a & tokens_b) / len(tokens_a | tokens_b)
else:
jaccard = 0.0
# Sequence similarity
seq = SequenceMatcher(None, norm_a, norm_b).ratio()
# Weighted combination
return max(jaccard * 0.6 + seq * 0.4, seq)
def init_db():
"""Initialize database schema."""
conn = get_db()
cursor = conn.cursor()
# ── Users ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT NOT NULL,
telegram_user_id TEXT UNIQUE,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# ── Sessions ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# ── Foods (master records, nutrition per_100g as base) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS foods (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
normalized_name TEXT NOT NULL,
brand TEXT,
brand_normalized TEXT,
barcode TEXT,
notes TEXT,
-- Nutrition per base unit (per_100g for weight-based, per serving for countable items)
calories_per_base REAL NOT NULL DEFAULT 0,
protein_per_base REAL NOT NULL DEFAULT 0,
carbs_per_base REAL NOT NULL DEFAULT 0,
fat_per_base REAL NOT NULL DEFAULT 0,
-- Base unit: "100g" for weight-based foods, or "piece"/"slice"/"serving" etc for countable
base_unit TEXT NOT NULL DEFAULT '100g',
-- Status: confirmed, ai_created, needs_review, archived
status TEXT NOT NULL DEFAULT 'confirmed',
created_by_user_id TEXT,
is_shared INTEGER NOT NULL DEFAULT 1,
image_path TEXT, -- Filename in data/images/
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by_user_id) REFERENCES users(id)
)
''')
# ── Food servings (named portion definitions for a food) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS food_servings (
id TEXT PRIMARY KEY,
food_id TEXT NOT NULL,
name TEXT NOT NULL, -- e.g. "1 cup", "1 slice", "small bowl", "medium plate"
-- How much of the base unit this serving represents
-- For base_unit=100g: amount_in_base=1.5 means 150g
-- For base_unit=piece: amount_in_base=1 means 1 piece
amount_in_base REAL NOT NULL DEFAULT 1.0,
is_default INTEGER NOT NULL DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE
)
''')
# ── Food aliases (for fuzzy matching) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS food_aliases (
id TEXT PRIMARY KEY,
food_id TEXT NOT NULL,
alias TEXT NOT NULL,
alias_normalized TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE,
UNIQUE(alias_normalized)
)
''')
# ── Food entries (daily log with immutable snapshots) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS food_entries (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
food_id TEXT, -- Reference to food (nullable for quick-add)
-- Meal type: breakfast, lunch, dinner, snack
meal_type TEXT NOT NULL,
entry_date TEXT NOT NULL, -- YYYY-MM-DD
-- Entry type: food (normal), quick_add (calories only)
entry_type TEXT NOT NULL DEFAULT 'food',
-- Quantity and unit at log time
quantity REAL NOT NULL DEFAULT 1.0,
unit TEXT NOT NULL DEFAULT 'serving',
serving_description TEXT, -- e.g. "1 medium bowl", "2 slices"
-- IMMUTABLE SNAPSHOT: full nutrition + context at time of logging
snapshot_food_name TEXT NOT NULL,
snapshot_serving_label TEXT, -- e.g. "1 breast (170g)", "1 cup"
snapshot_grams REAL, -- estimated grams if known
snapshot_calories REAL NOT NULL DEFAULT 0,
snapshot_protein REAL NOT NULL DEFAULT 0,
snapshot_carbs REAL NOT NULL DEFAULT 0,
snapshot_fat REAL NOT NULL DEFAULT 0,
-- Source & method
source TEXT NOT NULL DEFAULT 'web', -- where: web, telegram, api
entry_method TEXT NOT NULL DEFAULT 'manual', -- how: manual, search, template, ai_plate, ai_label, quick_add
raw_text TEXT, -- Original text from Telegram/AI
confidence_score REAL, -- 0.0 to 1.0 from resolve
note TEXT, -- User note
image_ref TEXT, -- Reference to uploaded image (AI photo logging)
-- AI metadata (JSON blob if from AI)
ai_metadata TEXT,
-- Idempotency: prevent duplicate entries from retries
idempotency_key TEXT UNIQUE,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (food_id) REFERENCES foods(id)
)
''')
# ── Goals (date-ranged) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS goals (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
start_date TEXT NOT NULL, -- YYYY-MM-DD
end_date TEXT, -- NULL = still active
calories REAL NOT NULL DEFAULT 2000,
protein REAL NOT NULL DEFAULT 150,
carbs REAL NOT NULL DEFAULT 200,
fat REAL NOT NULL DEFAULT 65,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# ── Meal templates ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS meal_templates (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
meal_type TEXT, -- Optional default meal type
is_favorite INTEGER NOT NULL DEFAULT 0,
is_archived INTEGER NOT NULL DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS meal_template_items (
id TEXT PRIMARY KEY,
template_id TEXT NOT NULL,
food_id TEXT NOT NULL,
quantity REAL NOT NULL DEFAULT 1.0,
unit TEXT NOT NULL DEFAULT 'serving',
serving_description TEXT,
-- Snapshot for display (so template shows correct info even if food changes)
snapshot_food_name TEXT NOT NULL,
snapshot_calories REAL NOT NULL DEFAULT 0,
snapshot_protein REAL NOT NULL DEFAULT 0,
snapshot_carbs REAL NOT NULL DEFAULT 0,
snapshot_fat REAL NOT NULL DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (template_id) REFERENCES meal_templates(id) ON DELETE CASCADE,
FOREIGN KEY (food_id) REFERENCES foods(id)
)
''')
# ── Food resolution queue (for low-confidence matches) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS food_resolution_queue (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
raw_text TEXT NOT NULL,
proposed_food_id TEXT,
candidates_json TEXT, -- JSON array of {food_id, name, score}
confidence REAL NOT NULL DEFAULT 0,
meal_type TEXT,
entry_date TEXT,
quantity REAL,
unit TEXT,
source TEXT,
-- Resolution
resolved_food_id TEXT,
resolved_at TEXT,
resolution_action TEXT, -- matched, created, merged, dismissed
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (proposed_food_id) REFERENCES foods(id),
FOREIGN KEY (resolved_food_id) REFERENCES foods(id)
)
''')
# ── User favorites ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_favorites (
user_id TEXT NOT NULL,
food_id TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, food_id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (food_id) REFERENCES foods(id) ON DELETE CASCADE
)
''')
# ── Audit log (merges, resolutions, AI edits) ──
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY,
user_id TEXT,
action TEXT NOT NULL, -- food_merged, queue_resolved, entry_ai_edited, food_archived, food_created
entity_type TEXT NOT NULL, -- food, entry, queue, template
entity_id TEXT,
details TEXT, -- JSON blob with action-specific details
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# ── FTS5 virtual table for food search ──
cursor.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS foods_fts USING fts5(
name,
normalized_name,
brand,
content=foods,
content_rowid=rowid
)
''')
# ── FTS5 for aliases ──
cursor.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS aliases_fts USING fts5(
alias,
alias_normalized,
content=food_aliases,
content_rowid=rowid
)
''')
# ── Indexes ──
cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_normalized ON foods(normalized_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_status ON foods(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_foods_brand_norm ON foods(brand_normalized)')
cursor.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_foods_barcode_unique ON foods(barcode) WHERE barcode IS NOT NULL')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_aliases_normalized ON food_aliases(alias_normalized)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_user_date ON food_entries(user_id, entry_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_user_meal ON food_entries(user_id, meal_type)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_food ON food_entries(food_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_food_date ON food_entries(food_id, entry_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_food_entries_idempotency ON food_entries(idempotency_key) WHERE idempotency_key IS NOT NULL')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_goals_user_date ON goals(user_id, start_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_goals_user_active ON goals(user_id, is_active)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_resolution_queue_user ON food_resolution_queue(user_id, resolved_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_telegram ON users(telegram_user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_log(entity_type, entity_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_templates_user ON meal_templates(user_id, is_archived)')
# ── Triggers to keep FTS5 in sync ──
cursor.executescript('''
CREATE TRIGGER IF NOT EXISTS foods_ai AFTER INSERT ON foods BEGIN
INSERT INTO foods_fts(rowid, name, normalized_name, brand)
VALUES (new.rowid, new.name, new.normalized_name, new.brand);
END;
CREATE TRIGGER IF NOT EXISTS foods_ad AFTER DELETE ON foods BEGIN
INSERT INTO foods_fts(foods_fts, rowid, name, normalized_name, brand)
VALUES ('delete', old.rowid, old.name, old.normalized_name, old.brand);
END;
CREATE TRIGGER IF NOT EXISTS foods_au AFTER UPDATE ON foods BEGIN
INSERT INTO foods_fts(foods_fts, rowid, name, normalized_name, brand)
VALUES ('delete', old.rowid, old.name, old.normalized_name, old.brand);
INSERT INTO foods_fts(rowid, name, normalized_name, brand)
VALUES (new.rowid, new.name, new.normalized_name, new.brand);
END;
CREATE TRIGGER IF NOT EXISTS aliases_ai AFTER INSERT ON food_aliases BEGIN
INSERT INTO aliases_fts(rowid, alias, alias_normalized)
VALUES (new.rowid, new.alias, new.alias_normalized);
END;
CREATE TRIGGER IF NOT EXISTS aliases_ad AFTER DELETE ON food_aliases BEGIN
INSERT INTO aliases_fts(aliases_fts, rowid, alias, alias_normalized)
VALUES ('delete', old.rowid, old.alias, old.alias_normalized);
END;
CREATE TRIGGER IF NOT EXISTS aliases_au AFTER UPDATE ON food_aliases BEGIN
INSERT INTO aliases_fts(aliases_fts, rowid, alias, alias_normalized)
VALUES ('delete', old.rowid, old.alias, old.alias_normalized);
INSERT INTO aliases_fts(rowid, alias, alias_normalized)
VALUES (new.rowid, new.alias, new.alias_normalized);
END;
''')
# Migration: add image_path to foods if missing
try:
cursor.execute("ALTER TABLE foods ADD COLUMN image_path TEXT")
conn.commit()
except:
pass
conn.commit()
conn.close()
# ─── Food image search & download ───────────────────────────────────────────
def search_google_images(query: str, num: int = 6) -> list:
"""Search Google Images for food photos."""
if not GOOGLE_API_KEY or not GOOGLE_CX:
return []
url = (f"https://www.googleapis.com/customsearch/v1"
f"?key={GOOGLE_API_KEY}&cx={GOOGLE_CX}&searchType=image"
f"&q={urllib.parse.quote(query + ' food')}&num={num}")
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
return [
{'url': item.get('link'), 'thumbnail': item.get('image', {}).get('thumbnailLink'), 'title': item.get('title')}
for item in data.get('items', [])
]
except Exception as e:
logger.warning(f"Google image search failed: {e}")
return []
def download_food_image(image_url: str, food_id: str) -> str | None:
"""Download an image from URL and save to data/images/. Returns filename."""
try:
req = urllib.request.Request(image_url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "image/*,*/*",
"Referer": "https://www.google.com/",
})
with urllib.request.urlopen(req, timeout=15) as resp:
image_bytes = resp.read()
content_type = resp.headers.get("Content-Type", "image/jpeg")
ext_map = {"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "image/webp": ".webp"}
ext = ext_map.get(content_type.split(";")[0], ".jpg")
# Unique filename each time so browser cache doesn't serve stale image
filename = f"{food_id}_{uuid.uuid4().hex[:8]}{ext}"
filepath = IMAGES_DIR / filename
# Delete old image file if exists
conn = get_db()
old = conn.execute("SELECT image_path FROM foods WHERE id = ?", (food_id,)).fetchone()
if old and old['image_path']:
old_path = IMAGES_DIR / old['image_path']
if old_path.exists():
old_path.unlink()
with open(filepath, "wb") as f:
f.write(image_bytes)
# Update food record
conn.execute("UPDATE foods SET image_path = ? WHERE id = ?", (filename, food_id))
conn.commit()
conn.close()
return filename
except Exception as e:
logger.warning(f"Failed to download food image: {e}")
return None
def auto_fetch_food_image(food_id: str, food_name: str, brand: str = None):
"""Auto-fetch an image for a food using Google Image Search. Runs in background."""
import threading
def _fetch():
query = f"{brand} {food_name}" if brand else food_name
images = search_google_images(query, num=6)
# Try full-size URLs first (sharp images), thumbnail as last resort
for img in images:
url = img.get('url')
if url:
result = download_food_image(url, food_id)
if result:
return
# All full URLs failed — fall back to thumbnails
for img in images:
thumb = img.get('thumbnail')
if thumb:
result = download_food_image(thumb, food_id)
if result:
return
threading.Thread(target=_fetch, daemon=True).start()
def seed_default_users():
"""Create default users if they don't exist."""
conn = get_db()
cursor = conn.cursor()
users = [
{
"id": str(uuid.uuid4()),
"username": os.environ.get("USER1_USERNAME", "yusuf"),
"password": os.environ.get("USER1_PASSWORD", "changeme"),
"display_name": os.environ.get("USER1_DISPLAY_NAME", "Yusuf"),
"telegram_user_id": os.environ.get("USER1_TELEGRAM_ID"),
},
{
"id": str(uuid.uuid4()),
"username": os.environ.get("USER2_USERNAME", "madiha"),
"password": os.environ.get("USER2_PASSWORD", "changeme"),
"display_name": os.environ.get("USER2_DISPLAY_NAME", "Madiha"),
"telegram_user_id": os.environ.get("USER2_TELEGRAM_ID"),
},
]
for user in users:
existing = cursor.execute("SELECT id FROM users WHERE username = ?", (user["username"],)).fetchone()
if not existing:
password_hash = bcrypt.hashpw(user["password"].encode(), bcrypt.gensalt()).decode()
cursor.execute(
"INSERT INTO users (id, username, password_hash, display_name, telegram_user_id) VALUES (?, ?, ?, ?, ?)",
(user["id"], user["username"], password_hash, user["display_name"], user["telegram_user_id"])
)
conn.commit()
conn.close()
# ─── Auth helpers ────────────────────────────────────────────────────────────
def create_session(user_id: str) -> str:
"""Create a new session token for a user."""
token = secrets.token_urlsafe(32)
expires = (datetime.now() + timedelta(days=30)).isoformat()
conn = get_db()
conn.execute("INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
(token, user_id, expires))
conn.commit()
conn.close()
return token
def get_user_from_session(token: str) -> dict | None:
"""Get user from session token."""
if not token:
return None
conn = get_db()
row = conn.execute('''
SELECT u.id, u.username, u.display_name, u.telegram_user_id
FROM sessions s JOIN users u ON s.user_id = u.id
WHERE s.token = ? AND s.expires_at > ?
''', (token, datetime.now().isoformat())).fetchone()
conn.close()
if row:
return dict(row)
return None
def get_user_from_api_key_and_telegram(api_key: str, telegram_user_id: str) -> dict | None:
"""Authenticate via API key + Telegram user ID (for service-to-service)."""
if not API_KEY or api_key != API_KEY:
return None
conn = get_db()
row = conn.execute(
"SELECT id, username, display_name, telegram_user_id FROM users WHERE telegram_user_id = ?",
(telegram_user_id,)
).fetchone()
conn.close()
if row:
return dict(row)
return None
# ─── Food search & resolve engine ────────────────────────────────────────────
def search_foods(query: str, user_id: str = None, limit: int = 20) -> list:
"""
Layered food search:
1. Exact normalized match
2. Alias exact match
3. Tokenized/contains match
4. FTS5 candidate retrieval
5. Similarity scoring over top candidates
"""
conn = get_db()
norm_query = normalize_food_name(query)
query_tokens = tokenize_food_name(query)
candidates = {} # food_id -> {food_data, score, match_type}
# Layer 1: Exact normalized match
rows = conn.execute(
"SELECT * FROM foods WHERE normalized_name = ? AND status != 'archived'",
(norm_query,)
).fetchall()
for row in rows:
food = dict(row)
candidates[food['id']] = {'food': food, 'score': 1.0, 'match_type': 'exact'}
# Layer 2: Alias exact match
alias_rows = conn.execute(
"SELECT fa.food_id, f.* FROM food_aliases fa JOIN foods f ON fa.food_id = f.id "
"WHERE fa.alias_normalized = ? AND f.status != 'archived'",
(norm_query,)
).fetchall()
for row in alias_rows:
food = dict(row)
fid = food['food_id']
if fid not in candidates:
candidates[fid] = {'food': food, 'score': 0.95, 'match_type': 'alias_exact'}
# Layer 3: Tokenized/contains match (all query tokens appear in food name)
if query_tokens and len(candidates) < limit:
like_clauses = " AND ".join(["normalized_name LIKE ?" for _ in query_tokens])
like_params = [f"%{t}%" for t in query_tokens]
rows = conn.execute(
f"SELECT * FROM foods WHERE {like_clauses} AND status != 'archived' LIMIT ?",
like_params + [limit]
).fetchall()
for row in rows:
food = dict(row)
if food['id'] not in candidates:
score = similarity_score(query, food['name'])
candidates[food['id']] = {'food': food, 'score': max(score, 0.7), 'match_type': 'token_match'}
# Layer 4: FTS5 search
if len(candidates) < limit:
# Build FTS query: each token as a prefix match
fts_terms = " OR ".join([f'"{t}"*' for t in query_tokens if t])
if fts_terms:
try:
fts_rows = conn.execute(
"SELECT f.* FROM foods_fts fts JOIN foods f ON f.rowid = fts.rowid "
"WHERE foods_fts MATCH ? AND f.status != 'archived' LIMIT ?",
(fts_terms, limit * 2)
).fetchall()
for row in fts_rows:
food = dict(row)
if food['id'] not in candidates:
score = similarity_score(query, food['name'])
candidates[food['id']] = {'food': food, 'score': score, 'match_type': 'fts'}
except sqlite3.OperationalError:
pass # FTS query syntax error, skip
# Also search aliases FTS
try:
alias_fts_rows = conn.execute(
"SELECT fa.food_id, f.* FROM aliases_fts afts "
"JOIN food_aliases fa ON fa.rowid = afts.rowid "
"JOIN foods f ON fa.food_id = f.id "
"WHERE aliases_fts MATCH ? AND f.status != 'archived' LIMIT ?",
(fts_terms, limit)
).fetchall()
for row in alias_fts_rows:
food = dict(row)
fid = food.get('food_id', food['id'])
if fid not in candidates:
score = similarity_score(query, food['name'])
candidates[fid] = {'food': food, 'score': max(score, 0.5), 'match_type': 'alias_fts'}
except sqlite3.OperationalError:
pass
conn.close()
# Sort by score descending and return top N
sorted_candidates = sorted(candidates.values(), key=lambda x: x['score'], reverse=True)[:limit]
results = []
for c in sorted_candidates:
food = c['food']
# Get servings for this food
servings = get_food_servings(food['id'])
results.append({
'id': food['id'],
'name': food['name'],
'brand': food.get('brand'),
'base_unit': food.get('base_unit', '100g'),
'calories_per_base': food.get('calories_per_base', 0),
'protein_per_base': food.get('protein_per_base', 0),
'carbs_per_base': food.get('carbs_per_base', 0),
'fat_per_base': food.get('fat_per_base', 0),
'status': food.get('status', 'confirmed'),
'image_path': food.get('image_path'),
'servings': servings,
'score': round(c['score'], 3),
'match_type': c['match_type'],
})
return results
# ─── External nutrition lookup (OpenFoodFacts + USDA) ────────────────────────
def _http_get_json(url: str, timeout: int = 15) -> dict | None:
"""Make a GET request and return parsed JSON, or None on failure."""
try:
req = urllib.request.Request(url, headers={'User-Agent': 'CalorieTracker/1.0'})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode('utf-8'))
except (urllib.error.URLError, json.JSONDecodeError, TimeoutError, OSError) as e:
logger.debug(f"External lookup failed for {url}: {e}")
return None
def search_openfoodfacts(query: str, limit: int = 5) -> list:
"""Search OpenFoodFacts for food products using v2 API.
Returns list of {name, brand, barcode, calories_per_100g, protein, carbs, fat, serving_size, serving_unit, source}
"""
encoded = urllib.parse.quote(query)
# v2 search API — more reliable than the old cgi endpoint
# Try category search first (best for branded products)
url = (f"https://world.openfoodfacts.org/api/v2/search?"
f"categories_tags_en={encoded}&"
f"fields=product_name,brands,code,nutriments,serving_size,serving_quantity&"
f"page_size={limit}&sort_by=unique_scans_n")
data = _http_get_json(url)
# Fallback: v2 text search (search_terms param)
if not data or not data.get('products'):
url = (f"https://world.openfoodfacts.org/api/v2/search?"
f"search_terms={encoded}&"
f"fields=product_name,brands,code,nutriments,serving_size,serving_quantity&"
f"page_size={limit}&sort_by=unique_scans_n")
data = _http_get_json(url)
if not data or 'products' not in data:
return []
results = []
for p in data['products'][:limit]:
nuts = p.get('nutriments', {})
cal = nuts.get('energy-kcal_100g') or 0
# If no kcal, try kJ and convert
if not cal:
kj = nuts.get('energy_100g') or 0
if kj:
cal = round(float(kj) / 4.184, 1)
name = p.get('product_name', '').strip()
if not name or not cal:
continue
results.append({
'name': name,
'brand': (p.get('brands') or '').split(',')[0].strip() or None,
'barcode': p.get('code'),
'calories_per_100g': round(float(cal), 1),
'protein_per_100g': round(float(nuts.get('proteins_100g', 0) or 0), 1),
'carbs_per_100g': round(float(nuts.get('carbohydrates_100g', 0) or 0), 1),
'fat_per_100g': round(float(nuts.get('fat_100g', 0) or 0), 1),
'serving_size_text': p.get('serving_size'),
'serving_grams': p.get('serving_quantity'),
'source': 'openfoodfacts',
})
return results
def lookup_openfoodfacts_barcode(barcode: str) -> dict | None:
"""Look up a specific product by barcode on OpenFoodFacts."""
url = f"https://world.openfoodfacts.org/api/v2/product/{barcode}.json?fields=product_name,brands,code,nutriments,serving_size,serving_quantity"
data = _http_get_json(url)
if not data or data.get('status') != 1 or 'product' not in data:
return None
p = data['product']
nuts = p.get('nutriments', {})
cal = nuts.get('energy-kcal_100g', 0) or nuts.get('energy_100g', 0)
if cal and not nuts.get('energy-kcal_100g'):
cal = round(cal / 4.184, 1)
name = p.get('product_name', '').strip()
if not name:
return None
return {
'name': name,
'brand': (p.get('brands') or '').split(',')[0].strip() or None,
'barcode': p.get('code'),
'calories_per_100g': round(float(cal), 1),
'protein_per_100g': round(float(nuts.get('proteins_100g', 0) or 0), 1),
'carbs_per_100g': round(float(nuts.get('carbohydrates_100g', 0) or 0), 1),
'fat_per_100g': round(float(nuts.get('fat_100g', 0) or 0), 1),
'serving_size_text': p.get('serving_size'),
'serving_grams': p.get('serving_quantity'),
'source': 'openfoodfacts',
}
def search_usda(query: str, limit: int = 5) -> list:
"""Search USDA FoodData Central for foods.
Uses USDA_API_KEY env var, or falls back to DEMO_KEY (rate-limited).
Free key: https://fdc.nal.usda.gov/api-key-signup.html
"""
api_key = USDA_API_KEY or 'DEMO_KEY'
encoded = urllib.parse.quote(query)
url = f"https://api.nal.usda.gov/fdc/v1/foods/search?api_key={api_key}&query={encoded}&pageSize={limit}&dataType=Foundation,SR%20Legacy"
data = _http_get_json(url)
if not data or 'foods' not in data:
return []
results = []
for food in data['foods'][:limit]:
# Build nutrient map with IDs for disambiguation
nutrient_by_id = {}
nutrient_by_name = {}
for n in food.get('foodNutrients', []):
nutrient_by_id[n.get('nutrientId')] = n.get('value', 0)
nutrient_by_name[n.get('nutrientName', '')] = n.get('value', 0)
# Energy: prefer kcal (nutrientId=1008) over kJ (nutrientId=1062)
cal = nutrient_by_id.get(1008, 0) # Energy (kcal)
if not cal:
kj = nutrient_by_id.get(1062, 0) # Energy (kJ)
if kj:
cal = round(kj / 4.184, 1)
if not cal:
# Fallback: calculate from macros (4cal/g protein, 4cal/g carbs, 9cal/g fat)
protein = nutrient_by_name.get('Protein', 0) or 0
carbs = nutrient_by_name.get('Carbohydrate, by difference', 0) or 0
fat = nutrient_by_name.get('Total lipid (fat)', 0) or 0
cal = round(protein * 4 + carbs * 4 + fat * 9, 1)
protein = nutrient_by_name.get('Protein', 0) or 0
carbs = nutrient_by_name.get('Carbohydrate, by difference', 0) or 0
fat = nutrient_by_name.get('Total lipid (fat)', 0) or 0
name = food.get('description', '').strip()
if not name or (not cal and not protein):
continue
# USDA names are uppercase, clean them up
name = name.title()
results.append({
'name': name,
'brand': food.get('brandName'),
'barcode': food.get('gtinUpc'),
'calories_per_100g': round(float(cal), 1),
'protein_per_100g': round(float(protein), 1),
'carbs_per_100g': round(float(carbs), 1),
'fat_per_100g': round(float(fat), 1),
'serving_size_text': None,
'serving_grams': None,
'source': 'usda',
'usda_fdc_id': food.get('fdcId'),
})
return results
def search_external(query: str, limit: int = 5) -> list:
"""Search all external nutrition databases, deduplicate, and rank results."""
all_results = []
# Query both sources
off_results = search_openfoodfacts(query, limit)
usda_results = search_usda(query, limit)
all_results.extend(off_results)
all_results.extend(usda_results)
if not all_results:
return []
# Score by name similarity to query and deduplicate
seen_names = set()
scored = []
for r in all_results:
norm = normalize_food_name(r['name'])
if norm in seen_names:
continue
seen_names.add(norm)
score = similarity_score(query, r['name'])
r['relevance_score'] = round(score, 3)
scored.append(r)
# Sort by relevance
scored.sort(key=lambda x: x['relevance_score'], reverse=True)
return scored[:limit]
def import_external_food(external_result: dict, user_id: str) -> dict:
"""Import a food from an external nutrition database into the local DB."""
serving_grams = external_result.get('serving_grams')
serving_text = external_result.get('serving_size_text')
servings = [{'name': '100g', 'amount_in_base': 1.0, 'is_default': not serving_grams}]
if serving_grams and serving_text:
servings.append({
'name': serving_text,
'amount_in_base': round(float(serving_grams) / 100, 2),
'is_default': True,
})
food = create_food({
'name': external_result['name'],
'brand': external_result.get('brand'),
'barcode': external_result.get('barcode'),
'calories_per_base': external_result['calories_per_100g'],
'protein_per_base': external_result['protein_per_100g'],
'carbs_per_base': external_result['carbs_per_100g'],
'fat_per_base': external_result['fat_per_100g'],
'base_unit': '100g',
'status': 'confirmed',
'notes': f"Imported from {external_result.get('source', 'external')}",
'servings': servings,
}, user_id)
return food
# ─── Natural language parsing ────────────────────────────────────────────────
def parse_food_request(raw_text: str) -> dict:
"""Parse a natural language food logging request.
Extracts: food_description, meal_type, quantity, unit, modifiers, exclusions
Examples:
'Log a single scoop hot fudge sundae from braums'
-> food='hot fudge sundae', brand='braums', quantity=1, unit='scoop', meal_type=None
'Log 2 mince tacos for lunch. No sour cream. Just mince, cheese and lettuce'
-> food='mince tacos', quantity=2, meal_type='lunch', modifiers='just mince, cheese and lettuce', exclusions='no sour cream'
'Add as a snack a scoop of vanilla ice cream. Not the strawberry shake'
-> food='vanilla ice cream', quantity=1, unit='scoop', meal_type='snack', exclusions='not the strawberry shake'
"""
text = raw_text.strip()
result = {
'food_description': text,
'meal_type': None,
'quantity': 1.0,
'unit': 'serving',
'brand': None,
'modifiers': None,
'exclusions': None,
'original_text': text,
}
# Strip command prefixes: "log", "add", "track", "record"
text = re.sub(r'^(?:log|add|track|record)\s+', '', text, flags=re.IGNORECASE).strip()
# Extract meal type from text
meal_patterns = [
r'\bfor\s+(breakfast|lunch|dinner|snack)\b',
r'\bas\s+(?:a\s+)?(breakfast|lunch|dinner|snack)\b',
r'\b(breakfast|lunch|dinner|snack)\s*[:\-]\s*',
r'\bto\s+(breakfast|lunch|dinner|snack)\b',
]
for pattern in meal_patterns:
m = re.search(pattern, text, re.IGNORECASE)
if m:
result['meal_type'] = m.group(1).lower()
text = text[:m.start()] + text[m.end():]
text = text.strip().rstrip('.')
break
# Split into sentences for better parsing
sentences = re.split(r'[.!]\s+', text)
main_sentence = sentences[0] if sentences else text
extra_sentences = sentences[1:] if len(sentences) > 1 else []
# Extract exclusions from extra sentences and main: "no sour cream", "not the strawberry", "make sure..."
exclusion_parts = []
modifier_parts = []
remaining_extras = []
for sent in extra_sentences:
sent = sent.strip().rstrip('.')
if re.match(r'^(?:no|not|without|don\'?t|never)\b', sent, re.IGNORECASE):
exclusion_parts.append(sent)
elif re.match(r'^(?:just|only|make\s+sure|with|extra|light|heavy)\b', sent, re.IGNORECASE):
# "Make sure it is only vanilla ice cream" -> modifier
modifier_parts.append(sent)
elif re.match(r'^(\d+)\s+(?:of\s+them|small|medium|large)', sent, re.IGNORECASE):
# "3 of them", "3 small ones" -> quantity override
qty_m = re.match(r'^(\d+)', sent)
if qty_m:
result['quantity'] = float(qty_m.group(1))
# Check for size modifier
size_m = re.search(r'\b(small|medium|large)\b', sent, re.IGNORECASE)
if size_m:
modifier_parts.append(size_m.group(0).lower())
else:
remaining_extras.append(sent)
# Also extract from main sentence
for pattern in [r'\.\s*(?:no|not|without|don\'?t)\s+[^.]*']:
for m in re.finditer(pattern, main_sentence, re.IGNORECASE):
exclusion_parts.append(m.group().strip().lstrip('. '))
main_sentence = re.sub(pattern, '', main_sentence, flags=re.IGNORECASE)
# Extract inline modifiers from main: "light sauce", "with cheese"
for pattern in [r'\b(?:just|only|with|extra|light|heavy)\s+[\w,\s]+$']:
m = re.search(pattern, main_sentence, re.IGNORECASE)
if m:
modifier_parts.append(m.group().strip())
main_sentence = main_sentence[:m.start()].strip()
if exclusion_parts:
result['exclusions'] = '; '.join(exclusion_parts)
if modifier_parts:
result['modifiers'] = '; '.join(modifier_parts)
# Extract brand: "from braums", "from mcdonalds"
brand_match = re.search(r'\bfrom\s+(\w[\w\s\']*?)(?:\s*[.,]|\s*$)', main_sentence, re.IGNORECASE)
if brand_match:
result['brand'] = brand_match.group(1).strip()
main_sentence = main_sentence[:brand_match.start()] + main_sentence[brand_match.end():]
# Clean up main sentence
main_sentence = re.sub(r'\s+', ' ', main_sentence).strip().rstrip('.,;')
# Handle "N of them" pattern still in main sentence
of_them = re.search(r'\.?\s*(\d+)\s+of\s+them\b', main_sentence, re.IGNORECASE)
if of_them:
result['quantity'] = float(of_them.group(1))
main_sentence = main_sentence[:of_them.start()].strip()
# Now parse quantity from the cleaned food description
parsed_qty = _parse_quantity_from_phrase(main_sentence)
# Only override quantity from phrase parsing if we didn't already get it from "N of them"/"N small ones"
if result['quantity'] == 1.0 or parsed_qty['quantity'] != 1.0:
if parsed_qty['quantity'] != 1.0:
result['quantity'] = parsed_qty['quantity']
result['unit'] = parsed_qty['unit']
result['food_description'] = parsed_qty['food_name']
# Strip filler: "a", "an", "some", "single" from start
result['food_description'] = re.sub(r'^(?:a|an|some)\s+', '', result['food_description'], flags=re.IGNORECASE).strip()
# Handle "single scoop X" -> qty=1, unit=scoop, food=X
scoop_match = re.match(r'^(?:single\s+)?(?:scoop|scoops)\s+(?:of\s+)?(.+)$', result['food_description'], re.IGNORECASE)
if scoop_match:
result['unit'] = 'scoop'
result['food_description'] = scoop_match.group(1).strip()
# Also catch "scoop of X" in the food name
scoop_match2 = re.match(r'^(?:scoop|scoops)\s+(?:of\s+)?(.+)$', result['food_description'], re.IGNORECASE)
if scoop_match2:
result['unit'] = 'scoop'
result['food_description'] = scoop_match2.group(1).strip()
# Remove "single" prefix if still there
result['food_description'] = re.sub(r'^single\s+', '', result['food_description'], flags=re.IGNORECASE).strip()
return result
def _ai_split_items(phrase: str) -> list[str]:
"""Use AI to split a multi-food phrase into individual items.
Returns a list of strings, each describing one food with its quantity.
Falls back to returning the original phrase as a single item if AI unavailable.
"""
if not OPENAI_API_KEY:
return [phrase]
prompt = f"""Split this food description into individual food items. Each item should include its quantity.
Input: "{phrase}"
Rules:
- Return a JSON array of strings
- Each string is one food item with its quantity
- Keep quantities attached to their food: "2 eggs" stays as "2 eggs", not "eggs"
- "half an egg" "0.5 egg"
- "one porotta" "1 porotta"
- If it's clearly one dish (like "chicken fried rice" or "egg sandwich"), keep it as one item
- Convert words to numbers: "one" "1", "two" "2", "half" "0.5"
- Return ONLY the JSON array, no other text
Examples:
- "2 eggs and toast" ["2 eggs", "1 toast"]
- "half egg and one porotta" ["0.5 egg", "1 porotta"]
- "1 porotta, 1 egg, 1 slice cheese" ["1 porotta", "1 egg", "1 slice cheese"]
- "chicken fried rice" ["1 chicken fried rice"]
- "egg sandwich with cheese" ["1 egg sandwich with cheese"]
- "2 rotis with dal and rice" ["2 roti", "1 dal", "1 rice"]"""
try:
req_body = json.dumps({
"model": OPENAI_MODEL,
"messages": [
{"role": "system", "content": "You split food descriptions into individual items. Return ONLY a JSON array of strings."},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
}).encode('utf-8')
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=req_body,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {OPENAI_API_KEY}',
},
)
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode('utf-8'))
content = data['choices'][0]['message']['content'].strip()
if content.startswith('```'):
content = content.split('```')[1]
if content.startswith('json'):
content = content[4:]
items = json.loads(content)
if isinstance(items, list) and len(items) > 0:
return [str(i).strip() for i in items if str(i).strip()]
except Exception as e:
logger.warning(f"AI split failed: {e}")
return [phrase]
# ─── AI nutrition estimation ─────────────────────────────────────────────────
def _ai_estimate_nutrition(food_description: str, modifiers: str = None,
exclusions: str = None, brand: str = None,
quantity: float = 1, unit: str = 'serving') -> dict | None:
"""Use OpenAI to estimate nutrition for a food item.
Returns: {food_name, calories_per_base, protein_per_base, carbs_per_base, fat_per_base,
base_unit, serving_description, estimated_grams, confidence}
"""
if not OPENAI_API_KEY:
return None
# Build the prompt
desc_parts = [food_description]
if brand:
desc_parts.append(f"from {brand}")
if modifiers:
desc_parts.append(f"({modifiers})")
if exclusions:
desc_parts.append(f"[{exclusions}]")
full_description = ' '.join(desc_parts)
prompt = f"""Estimate nutritional information for: {full_description}
Quantity specified: {quantity} {unit}
Return a JSON object with these fields:
- food_name: The FULL name of what was actually eaten, including key additions/toppings. Title case. Include brand if relevant.
Examples:
- "yogurt with a sprinkle of honey" "Yogurt With Honey"
- "mince tacos no sour cream" "Mince Tacos (No Sour Cream)"
- "smash burger" "Homemade Smash Burger"
- "Bellwether Farms yogurt with honey" "Bellwether Farms Yogurt With Honey"
Do NOT strip additions that change nutrition (honey, cheese, sauce, toppings).
DO strip quantities/measurements from the name.
- display_name: Short version for reuse, without one-off modifiers. E.g. "Bellwether Farms Yogurt" (without the honey)
- calories: Total calories for the ENTIRE specified quantity ({quantity} {unit})
- protein: Total grams of protein for the entire quantity
- carbs: Total grams of carbohydrates for the entire quantity
- fat: Total grams of fat for the entire quantity
- per_serving_calories: Calories for ONE serving/piece
- per_serving_protein: Protein for ONE serving/piece
- per_serving_carbs: Carbs for ONE serving/piece
- per_serving_fat: Fat for ONE serving/piece
- base_unit: What one unit is "piece", "scoop", "serving", "slice", etc.
- serving_description: Human-readable serving label, e.g. "1 taco", "1 scoop", "1 small pie"
- estimated_grams: Approximate grams per serving
- confidence: "high", "medium", or "low"
IMPORTANT:
- Account for all modifiers, additions, and exclusions in the calorie/macro estimate
- The food_name should reflect what was ACTUALLY eaten (with honey, without sour cream, etc.)
- The display_name should be the reusable base food (without one-off additions)
- If a brand is specified, use brand-specific nutrition if you know it
- Be realistic about portion sizes
- Return ONLY the JSON object, no other text"""
try:
req_body = json.dumps({
"model": OPENAI_MODEL,
"messages": [
{"role": "system", "content": "You are a nutrition expert. Return accurate JSON nutrition estimates only."},
{"role": "user", "content": prompt}
],
"temperature": 0.2,
}).encode('utf-8')
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=req_body,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {OPENAI_API_KEY}',
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode('utf-8'))
content = data['choices'][0]['message']['content'].strip()
# Strip markdown code blocks if present
if content.startswith('```'):
content = content.split('```')[1]
if content.startswith('json'):
content = content[4:]
result = json.loads(content)
# Validate and normalize
confidence_map = {'high': 0.85, 'medium': 0.65, 'low': 0.4}
return {
'food_name': str(result.get('food_name', food_description)).strip(),
'display_name': str(result.get('display_name', result.get('food_name', food_description))).strip(),
'calories': float(result.get('calories', 0)),
'protein': float(result.get('protein', 0)),
'carbs': float(result.get('carbs', 0)),
'fat': float(result.get('fat', 0)),
'calories_per_base': float(result.get('per_serving_calories', result.get('calories', 0))),
'protein_per_base': float(result.get('per_serving_protein', result.get('protein', 0))),
'carbs_per_base': float(result.get('per_serving_carbs', result.get('carbs', 0))),
'fat_per_base': float(result.get('per_serving_fat', result.get('fat', 0))),
'base_unit': str(result.get('base_unit', 'serving')),
'serving_description': str(result.get('serving_description', f'1 {unit}')),
'estimated_grams': float(result.get('estimated_grams', 0)) if result.get('estimated_grams') else None,
'confidence': confidence_map.get(result.get('confidence', 'medium'), 0.65),
'source': 'ai',
}
except Exception as e:
logger.warning(f"AI estimation failed: {e}")
return None
def _parse_quantity_from_phrase(phrase: str) -> dict:
"""Extract quantity, unit, and clean food name from a raw phrase.
Examples:
'2 cups rice' -> {quantity: 2, unit: 'cup', food_name: 'rice'}
'3 chicken breasts' -> {quantity: 3, unit: 'piece', food_name: 'chicken breasts'}
'small bowl biryani' -> {quantity: 1, unit: 'small bowl', food_name: 'biryani'}
'chicken breast' -> {quantity: 1, unit: 'serving', food_name: 'chicken breast'}
"""
phrase = phrase.strip()
# Household portions
household = {
'small bowl': 'small bowl', 'medium bowl': 'medium bowl', 'large bowl': 'large bowl',
'small plate': 'small plate', 'medium plate': 'medium plate', 'full plate': 'full plate',
'half plate': 'half plate', 'handful': 'handful', 'bite': 'bite',
}
for pattern, unit in household.items():
match = re.match(rf'^(?:(\d+\.?\d*)\s+)?{re.escape(pattern)}\s+(?:of\s+)?(.+)$', phrase, re.IGNORECASE)
if match:
return {'quantity': float(match.group(1) or 1), 'unit': unit, 'food_name': match.group(2).strip()}
# Numeric + unit patterns: "2 cups rice", "4oz chicken", "100g rice"
unit_map = {
'cups?': 'cup', 'tbsps?|tablespoons?': 'tbsp', 'tsps?|teaspoons?': 'tsp',
'oz|ounces?': 'oz', 'g|grams?': 'g', 'pieces?': 'piece', 'slices?': 'slice',
'servings?': 'serving',
}
for pattern, unit in unit_map.items():
match = re.match(rf'^(\d+\.?\d*)\s*(?:{pattern})\s+(?:of\s+)?(.+)$', phrase, re.IGNORECASE)
if match:
return {'quantity': float(match.group(1)), 'unit': unit, 'food_name': match.group(2).strip()}
# Just a number prefix: "2 chicken breasts", "3 eggs"
match = re.match(r'^(\d+\.?\d*)\s+(.+)$', phrase)
if match:
qty = float(match.group(1))
if qty <= 50: # Reasonable food quantity
return {'quantity': qty, 'unit': 'piece', 'food_name': match.group(2).strip()}
return {'quantity': 1, 'unit': 'serving', 'food_name': phrase}
# Confidence thresholds (conservative: prefer false negatives over wrong auto-matches)
THRESHOLD_AUTO_MATCH = 0.9
THRESHOLD_CONFIRM = 0.65
def resolve_food(raw_phrase: str, user_id: str, meal_type: str = None,
portion_text: str = None, entry_date: str = None,
source: str = 'api') -> dict:
"""
Smart food resolution endpoint.
Resolution chain: quick-add check parse NL local DB AI estimation queue
Stable response shape:
{
resolution_type: matched | confirm | queued | quick_add | ai_estimated
confidence: float
matched_food: {...} | null
candidate_foods: [...] | []
ai_estimate: {...} | null
parsed: {quantity, unit, food_description, meal_type, brand, modifiers, exclusions}
raw_text: str
queue_id: str | null
reason: str
}
"""
# Step 0: Parse natural language
parsed = parse_food_request(raw_phrase)
# Use explicitly provided meal_type if given, otherwise use parsed
effective_meal = meal_type or parsed['meal_type']
base_response = {
'resolution_type': None,
'confidence': 0.0,
'matched_food': None,
'candidate_foods': [],
'ai_estimate': None,
'parsed': {
'quantity': parsed['quantity'],
'unit': parsed['unit'],
'food_description': parsed['food_description'],
'meal_type': effective_meal,
'brand': parsed['brand'],
'modifiers': parsed['modifiers'],
'exclusions': parsed['exclusions'],
},
'raw_text': raw_phrase,
'queue_id': None,
'reason': None,
}
# Step 1: Quick-add check ("450 calories", "300cal")
quick_add_match = re.match(
r'^(?:(?:log|add|track)\s+)?(\d+\.?\d*)\s*(?:cal(?:ories)?|kcal)(?:\s+.*)?$',
raw_phrase.strip(), re.IGNORECASE
)
if quick_add_match:
calories = float(quick_add_match.group(1))
return {**base_response,
'resolution_type': 'quick_add',
'confidence': 1.0,
'parsed': {**base_response['parsed'], 'quantity': calories, 'unit': 'kcal'},
'reason': 'Detected quick-add calorie pattern',
}
# Step 2: Search local DB
food_name = parsed['food_description']
candidates = search_foods(food_name, user_id, limit=5)
# Step 2b: Retry with singularized name if no strong match
best_local_score = candidates[0]['score'] if candidates else 0
if best_local_score < THRESHOLD_AUTO_MATCH:
alt_name = _naive_singularize(food_name)
if alt_name.lower() != food_name.lower():
alt_candidates = search_foods(alt_name, user_id, limit=5)
seen_ids = {c['id'] for c in candidates}
for ac in alt_candidates:
if ac['id'] not in seen_ids:
candidates.append(ac)
seen_ids.add(ac['id'])
candidates.sort(key=lambda c: c['score'], reverse=True)
best_local_score = candidates[0]['score'] if candidates else 0
# Build note from modifiers/exclusions (used for all match types)
mod_note_parts = []
if parsed['modifiers']:
mod_note_parts.append(parsed['modifiers'])
if parsed['exclusions']:
mod_note_parts.append(parsed['exclusions'])
mod_note = '; '.join(mod_note_parts) if mod_note_parts else None
# Build snapshot name override: food name + modifiers if present
snapshot_override = None
if mod_note_parts and candidates:
base_name = candidates[0]['name']
snapshot_override = f"{base_name} ({', '.join(mod_note_parts)})"
if best_local_score >= THRESHOLD_AUTO_MATCH:
return {**base_response,
'resolution_type': 'matched',
'confidence': best_local_score,
'matched_food': candidates[0],
'candidate_foods': candidates[:3],
'snapshot_name_override': snapshot_override,
'note': mod_note,
'reason': f'High confidence local match ({best_local_score:.2f}) via {candidates[0]["match_type"]}',
}
# Step 3: AI estimation (if OpenAI is configured and we don't have a strong local match)
ai_estimate = None
if best_local_score < THRESHOLD_CONFIRM:
ai_estimate = _ai_estimate_nutrition(
food_description=parsed['food_description'],
modifiers=parsed['modifiers'],
exclusions=parsed['exclusions'],
brand=parsed['brand'],
quantity=parsed['quantity'],
unit=parsed['unit'],
)
if ai_estimate:
base_response['ai_estimate'] = ai_estimate
# Step 5: Decide resolution type based on what we found
# Local confirm-level match exists — auto-match it (no queue)
if best_local_score >= THRESHOLD_CONFIRM:
return {**base_response,
'resolution_type': 'matched',
'confidence': best_local_score,
'matched_food': candidates[0],
'candidate_foods': candidates[:3],
'snapshot_name_override': snapshot_override,
'note': mod_note,
'reason': f'Auto-matched local food ({best_local_score:.2f}) via {candidates[0]["match_type"]}',
}
# AI estimated successfully — check for existing canonical food before creating
if ai_estimate:
ai_display_name = ai_estimate.get('display_name', ai_estimate['food_name'])
# Pre-creation dedup: search for the AI's canonical name in local DB
dedup_candidates = search_foods(ai_display_name, user_id, limit=3)
# Also try singularized form
alt_dedup = _naive_singularize(ai_display_name)
if alt_dedup.lower() != ai_display_name.lower():
for ac in search_foods(alt_dedup, user_id, limit=3):
if not any(c['id'] == ac['id'] for c in dedup_candidates):
dedup_candidates.append(ac)
dedup_candidates.sort(key=lambda c: c['score'], reverse=True)
existing_match = None
for dc in dedup_candidates:
if dc['score'] >= THRESHOLD_CONFIRM:
existing_match = dc
break
if existing_match:
# Reuse existing canonical food — no new food created
logger.info(f"Dedup: reusing existing '{existing_match['name']}' for AI input '{ai_display_name}'")
matched = {
'id': existing_match['id'],
'name': existing_match['name'],
'brand': existing_match.get('brand'),
'base_unit': existing_match.get('base_unit', 'serving'),
'calories_per_base': existing_match.get('calories_per_base', 0),
'protein_per_base': existing_match.get('protein_per_base', 0),
'carbs_per_base': existing_match.get('carbs_per_base', 0),
'fat_per_base': existing_match.get('fat_per_base', 0),
'status': existing_match.get('status', 'confirmed'),
'servings': existing_match.get('servings', []),
'score': existing_match['score'],
'match_type': 'ai_dedup_matched',
}
else:
# No existing match — create new canonical food
new_food = create_food({
'name': ai_display_name,
'brand': parsed['brand'],
'calories_per_base': ai_estimate['calories_per_base'],
'protein_per_base': ai_estimate['protein_per_base'],
'carbs_per_base': ai_estimate['carbs_per_base'],
'fat_per_base': ai_estimate['fat_per_base'],
'base_unit': ai_estimate['base_unit'],
'status': 'ai_created',
'notes': f"AI estimated from: {raw_phrase}",
'servings': [{
'name': ai_estimate['serving_description'],
'amount_in_base': 1.0,
'is_default': True,
}],
}, user_id)
matched = {
'id': new_food['id'],
'name': new_food['name'],
'brand': new_food.get('brand'),
'base_unit': new_food.get('base_unit', 'serving'),
'calories_per_base': new_food.get('calories_per_base', 0),
'protein_per_base': new_food.get('protein_per_base', 0),
'carbs_per_base': new_food.get('carbs_per_base', 0),
'fat_per_base': new_food.get('fat_per_base', 0),
'status': 'ai_created',
'servings': new_food.get('servings', []),
'score': ai_estimate['confidence'],
'match_type': 'ai_created',
}
# Build note from modifiers/exclusions
note_parts = []
if parsed['modifiers']:
note_parts.append(parsed['modifiers'])
if parsed['exclusions']:
note_parts.append(parsed['exclusions'])
return {**base_response,
'resolution_type': 'ai_estimated',
'confidence': ai_estimate['confidence'],
'matched_food': matched,
'snapshot_name_override': ai_estimate['food_name'], # Full name with modifiers for the entry
'note': '; '.join(note_parts) if note_parts else None,
'reason': f'AI estimated nutrition for "{ai_estimate["food_name"]}" (confidence: {ai_estimate["confidence"]:.2f})',
}
# Nothing found — return as quick_add so the entry can still be created
# The gateway/frontend will handle it as a basic calorie entry
return {**base_response,
'resolution_type': 'quick_add',
'confidence': 0.0,
'candidate_foods': candidates[:3],
'reason': 'No match found — use as quick add entry',
}
def _create_resolution_queue_entry(user_id, raw_text, candidates, confidence,
meal_type=None, entry_date=None,
source=None, proposed_food_id=None) -> str:
"""Create an entry in the food resolution queue."""
queue_id = str(uuid.uuid4())
conn = get_db()
conn.execute(
"""INSERT INTO food_resolution_queue
(id, user_id, raw_text, proposed_food_id, candidates_json, confidence,
meal_type, entry_date, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(queue_id, user_id, raw_text, proposed_food_id,
json.dumps([{'food_id': c['id'], 'name': c['name'], 'score': c['score']} for c in candidates]),
confidence, meal_type, entry_date, source)
)
conn.commit()
conn.close()
return queue_id
# ─── Food CRUD helpers ──────────────────────────────────────────────────────
def get_food_servings(food_id: str) -> list:
"""Get all serving definitions for a food."""
conn = get_db()
rows = conn.execute(
"SELECT * FROM food_servings WHERE food_id = ? ORDER BY is_default DESC, name",
(food_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def create_food(data: dict, user_id: str) -> dict:
"""Create a new food with optional servings and aliases."""
food_id = str(uuid.uuid4())
name = data.get('name', '').strip()
normalized = normalize_food_name(name)
brand = data.get('brand')
brand_norm = normalize_food_name(brand) if brand else None
conn = get_db()
conn.execute(
"""INSERT INTO foods
(id, name, normalized_name, brand, brand_normalized, barcode, notes,
calories_per_base, protein_per_base, carbs_per_base, fat_per_base,
base_unit, status, created_by_user_id, is_shared)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(food_id, name, normalized, brand, brand_norm, data.get('barcode'),
data.get('notes'),
data.get('calories_per_base', 0), data.get('protein_per_base', 0),
data.get('carbs_per_base', 0), data.get('fat_per_base', 0),
data.get('base_unit', '100g'), data.get('status', 'confirmed'),
user_id, 1 if data.get('is_shared', True) else 0)
)
# Create default serving if provided
servings = data.get('servings', [])
if not servings:
# Auto-create a default "1 serving" entry
servings = [{'name': f'1 {data.get("base_unit", "serving")}', 'amount_in_base': 1.0, 'is_default': True}]
for s in servings:
conn.execute(
"INSERT INTO food_servings (id, food_id, name, amount_in_base, is_default) VALUES (?, ?, ?, ?, ?)",
(str(uuid.uuid4()), food_id, s.get('name', '1 serving'),
s.get('amount_in_base', 1.0), 1 if s.get('is_default') else 0)
)
# Create aliases from name tokens
_auto_create_aliases(conn, food_id, name)
conn.commit()
conn.close()
# Auto-fetch image in background
auto_fetch_food_image(food_id, name, data.get('brand'))
return get_food_by_id(food_id)
def _auto_create_aliases(conn, food_id: str, name: str):
"""Auto-create aliases from the food name (e.g., 'Grilled Chicken Breast' -> ['chicken breast', 'grilled chicken'])."""
normalized = normalize_food_name(name)
tokens = normalized.split()
aliases_to_add = set()
aliases_to_add.add(normalized)
# Add subsets of 2+ consecutive tokens
if len(tokens) >= 2:
for i in range(len(tokens)):
for j in range(i + 2, len(tokens) + 1):
subset = ' '.join(tokens[i:j])
if len(subset) >= 3:
aliases_to_add.add(subset)
for alias in aliases_to_add:
try:
conn.execute(
"INSERT OR IGNORE INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), food_id, alias, alias)
)
except sqlite3.IntegrityError:
pass # Alias already exists for another food
def get_food_by_id(food_id: str) -> dict | None:
"""Get a food by ID with its servings and aliases."""
conn = get_db()
row = conn.execute("SELECT * FROM foods WHERE id = ?", (food_id,)).fetchone()
if not row:
conn.close()
return None
food = dict(row)
food['servings'] = [dict(r) for r in conn.execute(
"SELECT * FROM food_servings WHERE food_id = ? ORDER BY is_default DESC", (food_id,)
).fetchall()]
food['aliases'] = [dict(r) for r in conn.execute(
"SELECT * FROM food_aliases WHERE food_id = ?", (food_id,)
).fetchall()]
conn.close()
return food
def _audit_log(user_id: str, action: str, entity_type: str, entity_id: str, details: dict = None):
"""Write an audit log entry."""
conn = get_db()
conn.execute(
"INSERT INTO audit_log (id, user_id, action, entity_type, entity_id, details) VALUES (?, ?, ?, ?, ?, ?)",
(str(uuid.uuid4()), user_id, action, entity_type, entity_id,
json.dumps(details) if details else None)
)
conn.commit()
conn.close()
def merge_foods(source_id: str, target_id: str, user_id: str = None) -> dict:
"""Merge source food into target: move aliases, repoint entries, archive source."""
conn = get_db()
source = conn.execute("SELECT * FROM foods WHERE id = ?", (source_id,)).fetchone()
target = conn.execute("SELECT * FROM foods WHERE id = ?", (target_id,)).fetchone()
if not source or not target:
conn.close()
return {'error': 'Source or target food not found'}
# Move aliases from source to target (skip duplicates)
source_aliases = conn.execute("SELECT * FROM food_aliases WHERE food_id = ?", (source_id,)).fetchall()
for alias in source_aliases:
try:
conn.execute(
"UPDATE food_aliases SET food_id = ? WHERE id = ?",
(target_id, alias['id'])
)
except sqlite3.IntegrityError:
conn.execute("DELETE FROM food_aliases WHERE id = ?", (alias['id'],))
# Add source name as alias on target
try:
conn.execute(
"INSERT OR IGNORE INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), target_id, source['name'], normalize_food_name(source['name']))
)
except sqlite3.IntegrityError:
pass
# Move food_entries references (snapshots are immutable, just update food_id reference)
conn.execute("UPDATE food_entries SET food_id = ? WHERE food_id = ?", (target_id, source_id))
# Move template items
conn.execute("UPDATE meal_template_items SET food_id = ? WHERE food_id = ?", (target_id, source_id))
# Move favorites
conn.execute("DELETE FROM user_favorites WHERE food_id = ?", (source_id,))
# Move servings (skip duplicates by name)
source_servings = conn.execute("SELECT * FROM food_servings WHERE food_id = ?", (source_id,)).fetchall()
for serving in source_servings:
existing = conn.execute(
"SELECT id FROM food_servings WHERE food_id = ? AND name = ?",
(target_id, serving['name'])
).fetchone()
if not existing:
conn.execute(
"UPDATE food_servings SET food_id = ? WHERE id = ?",
(target_id, serving['id'])
)
else:
conn.execute("DELETE FROM food_servings WHERE id = ?", (serving['id'],))
# Archive source
conn.execute("UPDATE foods SET status = 'archived' WHERE id = ?", (source_id,))
conn.commit()
conn.close()
# Audit trail
_audit_log(user_id, 'food_merged', 'food', target_id, {
'source_id': source_id, 'source_name': source['name'],
'target_id': target_id, 'target_name': target['name'],
})
return {'success': True, 'source_id': source_id, 'target_id': target_id}
# ─── Entry helpers ───────────────────────────────────────────────────────────
def calculate_entry_nutrition(food: dict, quantity: float, serving_id: str = None) -> dict:
"""Calculate nutrition for a food entry based on quantity and serving."""
base_calories = food.get('calories_per_base', 0)
base_protein = food.get('protein_per_base', 0)
base_carbs = food.get('carbs_per_base', 0)
base_fat = food.get('fat_per_base', 0)
# If a specific serving is selected, multiply by its base amount
multiplier = quantity
if serving_id:
conn = get_db()
serving = conn.execute("SELECT * FROM food_servings WHERE id = ?", (serving_id,)).fetchone()
conn.close()
if serving:
multiplier = quantity * serving['amount_in_base']
return {
'calories': round(base_calories * multiplier, 1),
'protein': round(base_protein * multiplier, 1),
'carbs': round(base_carbs * multiplier, 1),
'fat': round(base_fat * multiplier, 1),
}
def create_food_entry(data: dict, user_id: str) -> dict:
"""Create a food log entry with immutable nutrition snapshot."""
# Idempotency check: if key provided and already used, return existing entry
idempotency_key = data.get('idempotency_key')
if idempotency_key:
conn = get_db()
existing = conn.execute(
"SELECT * FROM food_entries WHERE idempotency_key = ?", (idempotency_key,)
).fetchone()
conn.close()
if existing:
return dict(existing)
entry_id = str(uuid.uuid4())
entry_type = data.get('entry_type', 'food')
food_id = data.get('food_id')
meal_type = data.get('meal_type', 'snack').lower()
entry_date = data.get('entry_date', date.today().isoformat())
quantity = data.get('quantity', 1.0)
unit = data.get('unit', 'serving')
serving_description = data.get('serving_description')
source = data.get('source', 'web')
entry_method = data.get('entry_method', 'manual')
image_ref = data.get('image_ref')
snapshot_serving_label = None
snapshot_grams = data.get('snapshot_grams')
# Quick-add: just calories, no food reference
if entry_type == 'quick_add':
snapshot_name = data.get('snapshot_food_name', 'Quick add')
snapshot_cals = data.get('snapshot_calories', 0)
snapshot_protein = data.get('snapshot_protein', 0)
snapshot_carbs = data.get('snapshot_carbs', 0)
snapshot_fat = data.get('snapshot_fat', 0)
entry_method = 'quick_add'
else:
# Get food and calculate nutrition snapshot
food = get_food_by_id(food_id)
if not food:
return {'error': 'Food not found'}
serving_id = data.get('serving_id')
nutrition = calculate_entry_nutrition(food, quantity, serving_id)
snapshot_name = data.get('snapshot_food_name_override') or food['name']
snapshot_cals = nutrition['calories']
snapshot_protein = nutrition['protein']
snapshot_carbs = nutrition['carbs']
snapshot_fat = nutrition['fat']
# Resolve serving label and grams for snapshot
if serving_id:
for s in food.get('servings', []):
if s['id'] == serving_id:
snapshot_serving_label = s['name']
if not serving_description:
serving_description = f"{quantity} x {s['name']}"
# Calculate grams if base_unit is 100g
if food.get('base_unit') == '100g':
snapshot_grams = round(quantity * s['amount_in_base'] * 100, 1)
break
elif food.get('base_unit') == '100g':
snapshot_grams = round(quantity * 100, 1)
conn = get_db()
conn.execute(
"""INSERT INTO food_entries
(id, user_id, food_id, meal_type, entry_date, entry_type,
quantity, unit, serving_description,
snapshot_food_name, snapshot_serving_label, snapshot_grams,
snapshot_calories, snapshot_protein, snapshot_carbs, snapshot_fat,
source, entry_method, raw_text, confidence_score, note, image_ref,
ai_metadata, idempotency_key)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(entry_id, user_id, food_id, meal_type, entry_date, entry_type,
quantity, unit, serving_description,
snapshot_name, snapshot_serving_label, snapshot_grams,
snapshot_cals, snapshot_protein, snapshot_carbs, snapshot_fat,
source, entry_method, data.get('raw_text'), data.get('confidence_score'),
data.get('note'), image_ref,
json.dumps(data.get('ai_metadata')) if data.get('ai_metadata') else None,
idempotency_key)
)
conn.commit()
conn.close()
return {
'id': entry_id,
'food_id': food_id,
'meal_type': meal_type,
'entry_date': entry_date,
'entry_type': entry_type,
'quantity': quantity,
'unit': unit,
'serving_description': serving_description,
'snapshot_food_name': snapshot_name,
'snapshot_serving_label': snapshot_serving_label,
'snapshot_grams': snapshot_grams,
'snapshot_calories': snapshot_cals,
'snapshot_protein': snapshot_protein,
'snapshot_carbs': snapshot_carbs,
'snapshot_fat': snapshot_fat,
'source': source,
'entry_method': entry_method,
}
def get_entries_by_date(user_id: str, entry_date: str) -> list:
"""Get all food entries for a user on a specific date, with food image."""
conn = get_db()
rows = conn.execute(
"""SELECT fe.*, f.image_path as food_image_path
FROM food_entries fe
LEFT JOIN foods f ON fe.food_id = f.id
WHERE fe.user_id = ? AND fe.entry_date = ?
ORDER BY
CASE fe.meal_type
WHEN 'breakfast' THEN 1
WHEN 'lunch' THEN 2
WHEN 'dinner' THEN 3
WHEN 'snack' THEN 4
ELSE 5
END,
fe.created_at""",
(user_id, entry_date)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_daily_totals(user_id: str, entry_date: str) -> dict:
"""Get daily nutrition totals."""
conn = get_db()
row = conn.execute(
"""SELECT
COALESCE(SUM(snapshot_calories), 0) as total_calories,
COALESCE(SUM(snapshot_protein), 0) as total_protein,
COALESCE(SUM(snapshot_carbs), 0) as total_carbs,
COALESCE(SUM(snapshot_fat), 0) as total_fat,
COUNT(*) as entry_count
FROM food_entries
WHERE user_id = ? AND entry_date = ?""",
(user_id, entry_date)
).fetchone()
conn.close()
return dict(row)
def get_goals_for_date(user_id: str, for_date: str) -> dict | None:
"""Get the goal for a user on a specific date.
Uses date ranges, not is_active flag historical goals remain queryable."""
conn = get_db()
row = conn.execute(
"""SELECT * FROM goals
WHERE user_id = ? AND start_date <= ?
AND (end_date IS NULL OR end_date >= ?)
ORDER BY start_date DESC LIMIT 1""",
(user_id, for_date, for_date)
).fetchone()
conn.close()
if row:
return dict(row)
return None
def get_recent_foods(user_id: str, limit: int = 20) -> list:
"""Get recently logged foods for a user (deduplicated)."""
conn = get_db()
rows = conn.execute(
"""SELECT DISTINCT fe.food_id, fe.snapshot_food_name, f.calories_per_base,
f.protein_per_base, f.carbs_per_base, f.fat_per_base, f.base_unit,
MAX(fe.created_at) as last_used
FROM food_entries fe
JOIN foods f ON fe.food_id = f.id
WHERE fe.user_id = ? AND fe.food_id IS NOT NULL AND f.status != 'archived'
GROUP BY fe.food_id
ORDER BY last_used DESC
LIMIT ?""",
(user_id, limit)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_frequent_foods(user_id: str, limit: int = 20) -> list:
"""Get most frequently logged foods for a user."""
conn = get_db()
rows = conn.execute(
"""SELECT fe.food_id, fe.snapshot_food_name, f.calories_per_base,
f.protein_per_base, f.carbs_per_base, f.fat_per_base, f.base_unit,
COUNT(*) as use_count, MAX(fe.created_at) as last_used
FROM food_entries fe
JOIN foods f ON fe.food_id = f.id
WHERE fe.user_id = ? AND fe.food_id IS NOT NULL AND f.status != 'archived'
GROUP BY fe.food_id
ORDER BY use_count DESC
LIMIT ?""",
(user_id, limit)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ─── HTTP Handler ────────────────────────────────────────────────────────────
class CalorieHandler(BaseHTTPRequestHandler):
"""HTTP request handler for the Calorie Tracker API."""
def _get_user(self) -> dict | None:
"""Authenticate the request. Supports session cookie, Bearer token, or API key + telegram_id."""
# Check session cookie
cookie = SimpleCookie(self.headers.get('Cookie', ''))
if 'session' in cookie:
user = get_user_from_session(cookie['session'].value)
if user:
return user
# Check Authorization header
auth = self.headers.get('Authorization', '')
if auth.startswith('Bearer '):
token = auth[7:]
user = get_user_from_session(token)
if user:
return user
# Check API key + telegram_user_id (service-to-service)
api_key = self.headers.get('X-API-Key', '')
telegram_id = self.headers.get('X-Telegram-User-Id', '')
if api_key and telegram_id:
user = get_user_from_api_key_and_telegram(api_key, telegram_id)
if user:
return user
return None
def _send_json(self, data, status=200):
"""Send a JSON response."""
body = json.dumps(data, default=str).encode('utf-8')
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(body))
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key, X-Telegram-User-Id')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PATCH, PUT, DELETE, OPTIONS')
self.end_headers()
self.wfile.write(body)
def _read_body(self) -> dict:
"""Read and parse JSON request body."""
length = int(self.headers.get('Content-Length', 0))
if length == 0:
return {}
body = self.rfile.read(length)
return json.loads(body)
def _require_auth(self) -> dict | None:
"""Require authentication; send 401 if not authenticated."""
user = self._get_user()
if not user:
self._send_json({'error': 'Unauthorized'}, 401)
return None
return user
def do_OPTIONS(self):
"""Handle CORS preflight."""
self.send_response(204)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key, X-Telegram-User-Id')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PATCH, PUT, DELETE, OPTIONS')
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip('/')
params = parse_qs(parsed.query)
# ── Public routes ──
if path == '/api/health':
return self._send_json({'status': 'ok'})
# ── Serve food images ──
import mimetypes
if path.startswith('/images/'):
filename = path.split('/')[-1]
filepath = IMAGES_DIR / filename
if filepath.exists() and filepath.is_file():
mime = mimetypes.guess_type(str(filepath))[0] or 'image/jpeg'
self.send_response(200)
self.send_header('Content-Type', mime)
self.send_header('Cache-Control', 'public, max-age=86400')
data = filepath.read_bytes()
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
return
self.send_response(404)
self.end_headers()
return
# ── Auth required routes ──
user = self._require_auth()
if not user:
return
# GET /api/user
if path == '/api/user':
return self._send_json(user)
# GET /api/foods/search?q=...
if path == '/api/foods/search':
query = params.get('q', [''])[0]
limit = int(params.get('limit', ['20'])[0])
if not query:
return self._send_json([])
results = search_foods(query, user['id'], limit)
return self._send_json(results)
# GET /api/foods — list all foods
if path == '/api/foods':
limit = int(params.get('limit', ['100'])[0])
conn = get_db()
rows = conn.execute(
"SELECT * FROM foods WHERE status != 'archived' ORDER BY name COLLATE NOCASE LIMIT ?",
(limit,)
).fetchall()
conn.close()
result = []
for r in rows:
food = dict(r)
food['servings'] = get_food_servings(food['id'])
result.append(food)
return self._send_json(result)
# GET /api/foods/recent
if path == '/api/foods/recent':
limit = int(params.get('limit', ['20'])[0])
return self._send_json(get_recent_foods(user['id'], limit))
# GET /api/foods/frequent
if path == '/api/foods/frequent':
limit = int(params.get('limit', ['20'])[0])
return self._send_json(get_frequent_foods(user['id'], limit))
# GET /api/foods/external?q=... — search OpenFoodFacts + USDA
if path == '/api/foods/external':
query = params.get('q', [''])[0]
limit = int(params.get('limit', ['10'])[0])
if not query:
return self._send_json([])
results = search_external(query, limit)
return self._send_json(results)
# GET /api/foods/barcode/<code> — lookup by barcode
barcode_match = re.match(r'^/api/foods/barcode/(\d+)$', path)
if barcode_match:
barcode = barcode_match.group(1)
# Check local DB first
conn = get_db()
local = conn.execute("SELECT * FROM foods WHERE barcode = ? AND status != 'archived'", (barcode,)).fetchone()
conn.close()
if local:
food = get_food_by_id(local['id'])
return self._send_json({'source': 'local', 'food': food})
# Try OpenFoodFacts
result = lookup_openfoodfacts_barcode(barcode)
if result:
return self._send_json({'source': 'openfoodfacts', 'external': result})
return self._send_json({'error': 'Barcode not found'}, 404)
# GET /api/foods/<id>
food_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path)
if food_match:
food = get_food_by_id(food_match.group(1))
if food:
return self._send_json(food)
return self._send_json({'error': 'Not found'}, 404)
# GET /api/entries?date=...&user_id=...
if path == '/api/entries':
entry_date = params.get('date', [date.today().isoformat()])[0]
target_user = params.get('user_id', [user['id']])[0]
entries = get_entries_by_date(target_user, entry_date)
return self._send_json(entries)
# GET /api/entries/totals?date=...
if path == '/api/entries/totals':
entry_date = params.get('date', [date.today().isoformat()])[0]
target_user = params.get('user_id', [user['id']])[0]
totals = get_daily_totals(target_user, entry_date)
return self._send_json(totals)
# GET /api/goals/for-date?date=...&user_id=...
if path == '/api/goals/for-date':
for_date = params.get('date', [date.today().isoformat()])[0]
target_user = params.get('user_id', [user['id']])[0]
goal = get_goals_for_date(target_user, for_date)
if goal:
return self._send_json(goal)
return self._send_json({'error': 'No active goal found'}, 404)
# GET /api/goals?user_id=...
if path == '/api/goals':
target_user = params.get('user_id', [user['id']])[0]
conn = get_db()
rows = conn.execute(
"SELECT * FROM goals WHERE user_id = ? ORDER BY start_date DESC",
(target_user,)
).fetchall()
conn.close()
return self._send_json([dict(r) for r in rows])
# GET /api/templates
if path == '/api/templates':
conn = get_db()
templates = conn.execute(
"SELECT * FROM meal_templates WHERE user_id = ? AND is_archived = 0 ORDER BY updated_at DESC",
(user['id'],)
).fetchall()
result = []
for t in templates:
t_dict = dict(t)
items = conn.execute(
"SELECT * FROM meal_template_items WHERE template_id = ?", (t['id'],)
).fetchall()
t_dict['items'] = [dict(i) for i in items]
result.append(t_dict)
conn.close()
return self._send_json(result)
# GET /api/favorites
if path == '/api/favorites':
conn = get_db()
rows = conn.execute(
"""SELECT f.* FROM user_favorites uf
JOIN foods f ON uf.food_id = f.id
WHERE uf.user_id = ? AND f.status != 'archived'
ORDER BY uf.created_at DESC""",
(user['id'],)
).fetchall()
conn.close()
return self._send_json([dict(r) for r in rows])
# GET /api/resolution-queue
if path == '/api/resolution-queue':
conn = get_db()
rows = conn.execute(
"""SELECT * FROM food_resolution_queue
WHERE user_id = ? AND resolved_at IS NULL
ORDER BY created_at DESC""",
(user['id'],)
).fetchall()
conn.close()
return self._send_json([dict(r) for r in rows])
# GET /api/users (list all users — for switching view)
if path == '/api/users':
conn = get_db()
rows = conn.execute("SELECT id, username, display_name FROM users").fetchall()
conn.close()
return self._send_json([dict(r) for r in rows])
self._send_json({'error': 'Not found'}, 404)
def do_POST(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip('/')
# ── Login (no auth required) ──
if path == '/api/auth/login':
data = self._read_body()
username = data.get('username', '').strip().lower()
password = data.get('password', '')
conn = get_db()
user = conn.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
).fetchone()
conn.close()
if not user or not bcrypt.checkpw(password.encode(), user['password_hash'].encode()):
return self._send_json({'error': 'Invalid credentials'}, 401)
token = create_session(user['id'])
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Set-Cookie', f'session={token}; Path=/; HttpOnly; SameSite=Lax; Max-Age=2592000')
self.send_header('Access-Control-Allow-Origin', '*')
body = json.dumps({'token': token, 'user': {
'id': user['id'], 'username': user['username'], 'display_name': user['display_name']
}}).encode()
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
return
# ── Auth required routes ──
user = self._require_auth()
if not user:
return
# POST /api/foods
if path == '/api/foods':
data = self._read_body()
if not data.get('name'):
return self._send_json({'error': 'name is required'}, 400)
food = create_food(data, user['id'])
return self._send_json(food, 201)
# POST /api/foods/split — AI-powered multi-item splitting
if path == '/api/foods/split':
data = self._read_body()
phrase = data.get('phrase', '').strip()
if not phrase:
return self._send_json({'error': 'phrase is required'}, 400)
items = _ai_split_items(phrase)
return self._send_json({'items': items})
# POST /api/foods/resolve
if path == '/api/foods/resolve':
data = self._read_body()
raw_phrase = data.get('raw_phrase', '').strip()
if not raw_phrase:
return self._send_json({'error': 'raw_phrase is required'}, 400)
result = resolve_food(
raw_phrase=raw_phrase,
user_id=user['id'],
meal_type=data.get('meal_type'),
portion_text=data.get('portion_text'),
entry_date=data.get('entry_date'),
source=data.get('source', 'api'),
)
return self._send_json(result)
# POST /api/images/search — Google Image Search for food photos
if path == '/api/images/search':
data = self._read_body()
query = data.get('query', '')
if not query:
return self._send_json({'error': 'query required'}, 400)
images = search_google_images(query, num=int(data.get('num', 6)))
return self._send_json({'images': images})
# POST /api/foods/<id>/image — download and set image from URL
image_set_match = re.match(r'^/api/foods/([a-f0-9-]+)/image$', path)
if image_set_match:
food_id = image_set_match.group(1)
data = self._read_body()
image_url = data.get('url')
if not image_url:
return self._send_json({'error': 'url required'}, 400)
filename = download_food_image(image_url, food_id)
if filename:
return self._send_json({'success': True, 'image_path': filename})
return self._send_json({'success': False, 'error': 'Failed to download image'})
# POST /api/foods/import — import from external nutrition database result
if path == '/api/foods/import':
data = self._read_body()
if not data.get('name') or not data.get('calories_per_100g'):
return self._send_json({'error': 'name and calories_per_100g required'}, 400)
food = import_external_food(data, user['id'])
_audit_log(user['id'], 'food_created', 'food', food['id'], {
'source': data.get('source', 'external'),
'name': data['name'],
'barcode': data.get('barcode'),
})
return self._send_json(food, 201)
# POST /api/foods/merge
if path == '/api/foods/merge':
data = self._read_body()
source_id = data.get('source_id')
target_id = data.get('target_id')
if not source_id or not target_id:
return self._send_json({'error': 'source_id and target_id required'}, 400)
result = merge_foods(source_id, target_id, user['id'])
if 'error' in result:
return self._send_json(result, 400)
return self._send_json(result)
# POST /api/entries
if path == '/api/entries':
data = self._read_body()
if data.get('entry_type') != 'quick_add' and not data.get('food_id'):
return self._send_json({'error': 'food_id is required (or use entry_type=quick_add)'}, 400)
entry = create_food_entry(data, user['id'])
if 'error' in entry:
return self._send_json(entry, 400)
return self._send_json(entry, 201)
# POST /api/favorites
if path == '/api/favorites':
data = self._read_body()
food_id = data.get('food_id')
if not food_id:
return self._send_json({'error': 'food_id required'}, 400)
conn = get_db()
try:
conn.execute("INSERT INTO user_favorites (user_id, food_id) VALUES (?, ?)",
(user['id'], food_id))
conn.commit()
except sqlite3.IntegrityError:
pass # Already a favorite
conn.close()
return self._send_json({'success': True})
# POST /api/templates
if path == '/api/templates':
data = self._read_body()
template_id = str(uuid.uuid4())
conn = get_db()
conn.execute(
"INSERT INTO meal_templates (id, user_id, name, meal_type, is_favorite) VALUES (?, ?, ?, ?, ?)",
(template_id, user['id'], data.get('name', 'Untitled'),
data.get('meal_type'), 1 if data.get('is_favorite') else 0)
)
for item in data.get('items', []):
food = get_food_by_id(item['food_id'])
if food:
nutrition = calculate_entry_nutrition(food, item.get('quantity', 1), item.get('serving_id'))
conn.execute(
"""INSERT INTO meal_template_items
(id, template_id, food_id, quantity, unit, serving_description,
snapshot_food_name, snapshot_calories, snapshot_protein, snapshot_carbs, snapshot_fat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4()), template_id, item['food_id'],
item.get('quantity', 1), item.get('unit', 'serving'),
item.get('serving_description'),
food['name'], nutrition['calories'], nutrition['protein'],
nutrition['carbs'], nutrition['fat'])
)
conn.commit()
conn.close()
return self._send_json({'id': template_id, 'name': data.get('name')}, 201)
# POST /api/templates/<id>/log
template_log_match = re.match(r'^/api/templates/([a-f0-9-]+)/log$', path)
if template_log_match:
template_id = template_log_match.group(1)
data = self._read_body()
meal_type = data.get('meal_type', 'snack')
entry_date = data.get('entry_date', date.today().isoformat())
conn = get_db()
items = conn.execute(
"SELECT * FROM meal_template_items WHERE template_id = ?", (template_id,)
).fetchall()
conn.close()
if not items:
return self._send_json({'error': 'Template not found or empty'}, 404)
results = []
for item in items:
entry = create_food_entry({
'food_id': item['food_id'],
'meal_type': meal_type,
'entry_date': entry_date,
'quantity': item['quantity'],
'unit': item['unit'],
'serving_description': item['serving_description'],
'source': 'template',
}, user['id'])
results.append(entry)
return self._send_json({'logged': len(results), 'entries': results})
# POST /api/resolution-queue/<id>/resolve
resolve_match = re.match(r'^/api/resolution-queue/([a-f0-9-]+)/resolve$', path)
if resolve_match:
queue_id = resolve_match.group(1)
data = self._read_body()
action = data.get('action') # matched, created, dismissed
food_id = data.get('food_id')
conn = get_db()
queue_item = conn.execute(
"SELECT * FROM food_resolution_queue WHERE id = ?", (queue_id,)
).fetchone()
if not queue_item:
conn.close()
return self._send_json({'error': 'Queue item not found'}, 404)
conn.execute(
"""UPDATE food_resolution_queue
SET resolved_food_id = ?, resolved_at = ?, resolution_action = ?
WHERE id = ?""",
(food_id, datetime.now().isoformat(), action, queue_id)
)
result = {'success': True, 'action': action}
# If matched or created, also create the food entry
if action in ('matched', 'created') and food_id:
entry = create_food_entry({
'food_id': food_id,
'meal_type': queue_item['meal_type'] or 'snack',
'entry_date': queue_item['entry_date'] or date.today().isoformat(),
'quantity': queue_item['quantity'] or 1.0,
'unit': queue_item['unit'] or 'serving',
'source': queue_item['source'] or 'web',
'entry_method': 'search',
'raw_text': queue_item['raw_text'],
'confidence_score': queue_item['confidence'],
}, user['id'])
result['entry'] = entry
conn.commit()
conn.close()
# Audit trail
_audit_log(user['id'], 'queue_resolved', 'queue', queue_id, {
'action': action, 'food_id': food_id,
'raw_text': queue_item['raw_text'],
'confidence': queue_item['confidence'],
})
return self._send_json(result)
# POST /api/foods/<id>/aliases
alias_match = re.match(r'^/api/foods/([a-f0-9-]+)/aliases$', path)
if alias_match:
food_id = alias_match.group(1)
data = self._read_body()
alias = data.get('alias', '').strip()
if not alias:
return self._send_json({'error': 'alias is required'}, 400)
alias_id = str(uuid.uuid4())
conn = get_db()
try:
conn.execute(
"INSERT INTO food_aliases (id, food_id, alias, alias_normalized) VALUES (?, ?, ?, ?)",
(alias_id, food_id, alias, normalize_food_name(alias))
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
return self._send_json({'error': 'Alias already exists'}, 409)
conn.close()
return self._send_json({'id': alias_id, 'alias': alias}, 201)
self._send_json({'error': 'Not found'}, 404)
def do_PATCH(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip('/')
user = self._require_auth()
if not user:
return
# PATCH /api/entries/<id>
entry_match = re.match(r'^/api/entries/([a-f0-9-]+)$', path)
if entry_match:
entry_id = entry_match.group(1)
data = self._read_body()
conn = get_db()
entry = conn.execute("SELECT * FROM food_entries WHERE id = ? AND user_id = ?",
(entry_id, user['id'])).fetchone()
if not entry:
conn.close()
return self._send_json({'error': 'Entry not found'}, 404)
updates = []
params = []
# Allow updating these fields
for field in ['meal_type', 'entry_date', 'quantity', 'unit', 'serving_description', 'note']:
if field in data:
updates.append(f"{field} = ?")
params.append(data[field])
# If quantity changed and it's a food entry, recalculate snapshot
if 'quantity' in data and entry['food_id']:
food = get_food_by_id(entry['food_id'])
if food:
quantity = data['quantity']
nutrition = calculate_entry_nutrition(food, quantity)
updates.extend([
'snapshot_calories = ?', 'snapshot_protein = ?',
'snapshot_carbs = ?', 'snapshot_fat = ?'
])
params.extend([nutrition['calories'], nutrition['protein'],
nutrition['carbs'], nutrition['fat']])
if updates:
params.append(entry_id)
conn.execute(f"UPDATE food_entries SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
updated = conn.execute("SELECT * FROM food_entries WHERE id = ?", (entry_id,)).fetchone()
conn.close()
return self._send_json(dict(updated))
# PATCH /api/foods/<id>
food_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path)
if food_match:
food_id = food_match.group(1)
data = self._read_body()
conn = get_db()
updates = []
params = []
for field in ['name', 'brand', 'barcode', 'notes',
'calories_per_base', 'protein_per_base', 'carbs_per_base', 'fat_per_base',
'base_unit', 'status', 'is_shared']:
if field in data:
if field == 'name':
updates.append('name = ?')
params.append(data['name'])
updates.append('normalized_name = ?')
params.append(normalize_food_name(data['name']))
elif field == 'brand':
updates.append('brand = ?')
params.append(data['brand'])
updates.append('brand_normalized = ?')
params.append(normalize_food_name(data['brand']) if data['brand'] else None)
else:
updates.append(f"{field} = ?")
params.append(data[field])
updates.append("updated_at = ?")
params.append(datetime.now().isoformat())
if updates:
params.append(food_id)
conn.execute(f"UPDATE foods SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
conn.close()
return self._send_json(get_food_by_id(food_id))
self._send_json({'error': 'Not found'}, 404)
def do_PUT(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip('/')
user = self._require_auth()
if not user:
return
# PUT /api/goals
if path == '/api/goals':
data = self._read_body()
target_user = data.get('user_id', user['id'])
start_date = data.get('start_date', date.today().isoformat())
conn = get_db()
# Deactivate/end previous active goals
conn.execute(
"""UPDATE goals SET end_date = ?, is_active = 0
WHERE user_id = ? AND is_active = 1 AND end_date IS NULL""",
(start_date, target_user)
)
goal_id = str(uuid.uuid4())
conn.execute(
"""INSERT INTO goals (id, user_id, start_date, end_date, calories, protein, carbs, fat, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)""",
(goal_id, target_user, start_date, data.get('end_date'),
data.get('calories', 2000), data.get('protein', 150),
data.get('carbs', 200), data.get('fat', 65))
)
conn.commit()
conn.close()
return self._send_json({'id': goal_id, 'start_date': start_date})
self._send_json({'error': 'Not found'}, 404)
def do_DELETE(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip('/')
user = self._require_auth()
if not user:
return
# DELETE /api/entries/<id>
entry_match = re.match(r'^/api/entries/([a-f0-9-]+)$', path)
if entry_match:
entry_id = entry_match.group(1)
conn = get_db()
conn.execute("DELETE FROM food_entries WHERE id = ? AND user_id = ?",
(entry_id, user['id']))
conn.commit()
conn.close()
return self._send_json({'success': True})
# DELETE /api/favorites/<food_id>
fav_match = re.match(r'^/api/favorites/([a-f0-9-]+)$', path)
if fav_match:
food_id = fav_match.group(1)
conn = get_db()
conn.execute("DELETE FROM user_favorites WHERE user_id = ? AND food_id = ?",
(user['id'], food_id))
conn.commit()
conn.close()
return self._send_json({'success': True})
# DELETE /api/templates/<id> (soft delete — archive)
template_match = re.match(r'^/api/templates/([a-f0-9-]+)$', path)
if template_match:
template_id = template_match.group(1)
conn = get_db()
conn.execute("UPDATE meal_templates SET is_archived = 1 WHERE id = ? AND user_id = ?",
(template_id, user['id']))
conn.commit()
conn.close()
return self._send_json({'success': True})
# DELETE /api/foods/<food_id> (soft delete — archive)
food_del_match = re.match(r'^/api/foods/([a-f0-9-]+)$', path)
if food_del_match:
food_id = food_del_match.group(1)
conn = get_db()
conn.execute("UPDATE foods SET status = 'archived', updated_at = ? WHERE id = ?",
(datetime.now().isoformat(), food_id))
conn.commit()
conn.close()
_audit_log(user['id'], 'food_archived', 'food', food_id, {'action': 'deleted by user'})
return self._send_json({'success': True})
# DELETE /api/foods/<food_id>/aliases/<alias_id>
alias_del_match = re.match(r'^/api/foods/([a-f0-9-]+)/aliases/([a-f0-9-]+)$', path)
if alias_del_match:
alias_id = alias_del_match.group(2)
conn = get_db()
conn.execute("DELETE FROM food_aliases WHERE id = ?", (alias_id,))
conn.commit()
conn.close()
return self._send_json({'success': True})
self._send_json({'error': 'Not found'}, 404)
def log_message(self, format, *args):
"""Suppress default logging for cleaner output."""
pass
# ─── Main ────────────────────────────────────────────────────────────────────
if __name__ == '__main__':
print(f"Initializing database at {DB_PATH}...")
init_db()
seed_default_users()
print(f"Starting Calorie Tracker on port {PORT}...")
server = HTTPServer(('0.0.0.0', PORT), CalorieHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down.")
server.server_close()