import json import sqlite3 from pathlib import Path DB_PATH = Path(__file__).resolve().parent / "media_crawler.db" def get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_conn() cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS tasks ( task_id TEXT PRIMARY KEY, trace_id TEXT NOT NULL, status TEXT NOT NULL, input_payload TEXT NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, summary TEXT ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS media_items ( tmdb_id TEXT PRIMARY KEY, type TEXT NOT NULL, title TEXT, year TEXT, tmdb_raw TEXT, hdhive_raw TEXT, cms_id TEXT, ingest_status TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS task_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, step TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, detail TEXT, created_at TEXT NOT NULL ) """ ) conn.commit() conn.close() def upsert_task(task): conn = get_conn() conn.execute( """ INSERT INTO tasks(task_id, trace_id, status, input_payload, started_at, finished_at, summary) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(task_id) DO UPDATE SET status=excluded.status, finished_at=excluded.finished_at, summary=excluded.summary """, ( task["taskId"], task["traceId"], task["status"], json.dumps(task["inputPayload"], ensure_ascii=False), task["startedAt"], task.get("finishedAt"), json.dumps(task.get("summary"), ensure_ascii=False), ), ) conn.commit() conn.close() def insert_log(task_id, step, level, message, detail, created_at): conn = get_conn() conn.execute( """ INSERT INTO task_logs(task_id, step, level, message, detail, created_at) VALUES (?, ?, ?, ?, ?, ?) """, ( task_id, step, level, message, json.dumps(detail, ensure_ascii=False), created_at, ), ) conn.commit() conn.close() def upsert_media_item(item): conn = get_conn() conn.execute( """ INSERT INTO media_items(tmdb_id, type, title, year, tmdb_raw, hdhive_raw, cms_id, ingest_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tmdb_id) DO UPDATE SET type=excluded.type, title=excluded.title, year=excluded.year, tmdb_raw=excluded.tmdb_raw, hdhive_raw=excluded.hdhive_raw, cms_id=excluded.cms_id, ingest_status=excluded.ingest_status """, ( str(item["tmdbId"]), item["type"], item.get("title"), item.get("year"), json.dumps(item.get("tmdbRaw"), ensure_ascii=False), json.dumps(item.get("hdhiveRaw"), ensure_ascii=False), item.get("cmsId"), item["ingestStatus"], ), ) conn.commit() conn.close() def find_media_item(tmdb_id): conn = get_conn() row = conn.execute( "SELECT * FROM media_items WHERE tmdb_id = ?", (str(tmdb_id),), ).fetchone() conn.close() if not row: return None return dict(row) def list_tasks(limit=50): conn = get_conn() rows = conn.execute( "SELECT * FROM tasks ORDER BY started_at DESC LIMIT ?", (limit,), ).fetchall() conn.close() result = [] for row in rows: item = dict(row) item["inputPayload"] = json.loads(item.pop("input_payload") or "{}") item["startedAt"] = item.pop("started_at") item["finishedAt"] = item.pop("finished_at") item["taskId"] = item.pop("task_id") item["traceId"] = item.pop("trace_id") item["summary"] = json.loads(item["summary"]) if item.get("summary") else None result.append(item) return result def list_logs(task_id): conn = get_conn() rows = conn.execute( "SELECT task_id, step, level, message, detail, created_at FROM task_logs WHERE task_id = ? ORDER BY id ASC", (task_id,), ).fetchall() conn.close() logs = [] for row in rows: item = dict(row) item["taskId"] = item.pop("task_id") item["createdAt"] = item.pop("created_at") item["detail"] = json.loads(item["detail"]) if item.get("detail") else {} logs.append(item) return logs