Add TMDB search and media detail pages, HDHive resource ingestion flow, unified error handling, Docker single-container runtime, and project docs/config updates for local deployment. Co-authored-by: Cursor <cursoragent@cursor.com>
183 lines
4.8 KiB
Python
183 lines
4.8 KiB
Python
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
DB_PATH = Path(__file__).resolve().parent / "media_crawler.db"
|
|
|
|
|
|
def get_conn():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
task_id TEXT PRIMARY KEY,
|
|
trace_id TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
input_payload TEXT NOT NULL,
|
|
started_at TEXT NOT NULL,
|
|
finished_at TEXT,
|
|
summary TEXT
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS media_items (
|
|
tmdb_id TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL,
|
|
title TEXT,
|
|
year TEXT,
|
|
tmdb_raw TEXT,
|
|
hdhive_raw TEXT,
|
|
cms_id TEXT,
|
|
ingest_status TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS task_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
step TEXT NOT NULL,
|
|
level TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
detail TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def upsert_task(task):
|
|
conn = get_conn()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO tasks(task_id, trace_id, status, input_payload, started_at, finished_at, summary)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(task_id) DO UPDATE SET
|
|
status=excluded.status,
|
|
finished_at=excluded.finished_at,
|
|
summary=excluded.summary
|
|
""",
|
|
(
|
|
task["taskId"],
|
|
task["traceId"],
|
|
task["status"],
|
|
json.dumps(task["inputPayload"], ensure_ascii=False),
|
|
task["startedAt"],
|
|
task.get("finishedAt"),
|
|
json.dumps(task.get("summary"), ensure_ascii=False),
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def insert_log(task_id, step, level, message, detail, created_at):
|
|
conn = get_conn()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO task_logs(task_id, step, level, message, detail, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
task_id,
|
|
step,
|
|
level,
|
|
message,
|
|
json.dumps(detail, ensure_ascii=False),
|
|
created_at,
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def upsert_media_item(item):
|
|
conn = get_conn()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO media_items(tmdb_id, type, title, year, tmdb_raw, hdhive_raw, cms_id, ingest_status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(tmdb_id) DO UPDATE SET
|
|
type=excluded.type,
|
|
title=excluded.title,
|
|
year=excluded.year,
|
|
tmdb_raw=excluded.tmdb_raw,
|
|
hdhive_raw=excluded.hdhive_raw,
|
|
cms_id=excluded.cms_id,
|
|
ingest_status=excluded.ingest_status
|
|
""",
|
|
(
|
|
str(item["tmdbId"]),
|
|
item["type"],
|
|
item.get("title"),
|
|
item.get("year"),
|
|
json.dumps(item.get("tmdbRaw"), ensure_ascii=False),
|
|
json.dumps(item.get("hdhiveRaw"), ensure_ascii=False),
|
|
item.get("cmsId"),
|
|
item["ingestStatus"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def find_media_item(tmdb_id):
|
|
conn = get_conn()
|
|
row = conn.execute(
|
|
"SELECT * FROM media_items WHERE tmdb_id = ?",
|
|
(str(tmdb_id),),
|
|
).fetchone()
|
|
conn.close()
|
|
if not row:
|
|
return None
|
|
return dict(row)
|
|
|
|
|
|
def list_tasks(limit=50):
|
|
conn = get_conn()
|
|
rows = conn.execute(
|
|
"SELECT * FROM tasks ORDER BY started_at DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
conn.close()
|
|
result = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["inputPayload"] = json.loads(item.pop("input_payload") or "{}")
|
|
item["startedAt"] = item.pop("started_at")
|
|
item["finishedAt"] = item.pop("finished_at")
|
|
item["taskId"] = item.pop("task_id")
|
|
item["traceId"] = item.pop("trace_id")
|
|
item["summary"] = json.loads(item["summary"]) if item.get("summary") else None
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def list_logs(task_id):
|
|
conn = get_conn()
|
|
rows = conn.execute(
|
|
"SELECT task_id, step, level, message, detail, created_at FROM task_logs WHERE task_id = ? ORDER BY id ASC",
|
|
(task_id,),
|
|
).fetchall()
|
|
conn.close()
|
|
logs = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["taskId"] = item.pop("task_id")
|
|
item["createdAt"] = item.pop("created_at")
|
|
item["detail"] = json.loads(item["detail"]) if item.get("detail") else {}
|
|
logs.append(item)
|
|
return logs
|