Files
media_crawler/backend/storage.py
renjue 82581d2949 Implement full media crawler workflow with Flask backend and Vue frontend.
Add TMDB search and media detail pages, HDHive resource ingestion flow, unified error handling, Docker single-container runtime, and project docs/config updates for local deployment.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 16:16:18 +08:00

183 lines
4.8 KiB
Python

import json
import sqlite3
from pathlib import Path
DB_PATH = Path(__file__).resolve().parent / "media_crawler.db"
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_conn()
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
status TEXT NOT NULL,
input_payload TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
summary TEXT
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS media_items (
tmdb_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
title TEXT,
year TEXT,
tmdb_raw TEXT,
hdhive_raw TEXT,
cms_id TEXT,
ingest_status TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
step TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
detail TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
conn.close()
def upsert_task(task):
conn = get_conn()
conn.execute(
"""
INSERT INTO tasks(task_id, trace_id, status, input_payload, started_at, finished_at, summary)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(task_id) DO UPDATE SET
status=excluded.status,
finished_at=excluded.finished_at,
summary=excluded.summary
""",
(
task["taskId"],
task["traceId"],
task["status"],
json.dumps(task["inputPayload"], ensure_ascii=False),
task["startedAt"],
task.get("finishedAt"),
json.dumps(task.get("summary"), ensure_ascii=False),
),
)
conn.commit()
conn.close()
def insert_log(task_id, step, level, message, detail, created_at):
conn = get_conn()
conn.execute(
"""
INSERT INTO task_logs(task_id, step, level, message, detail, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
task_id,
step,
level,
message,
json.dumps(detail, ensure_ascii=False),
created_at,
),
)
conn.commit()
conn.close()
def upsert_media_item(item):
conn = get_conn()
conn.execute(
"""
INSERT INTO media_items(tmdb_id, type, title, year, tmdb_raw, hdhive_raw, cms_id, ingest_status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id) DO UPDATE SET
type=excluded.type,
title=excluded.title,
year=excluded.year,
tmdb_raw=excluded.tmdb_raw,
hdhive_raw=excluded.hdhive_raw,
cms_id=excluded.cms_id,
ingest_status=excluded.ingest_status
""",
(
str(item["tmdbId"]),
item["type"],
item.get("title"),
item.get("year"),
json.dumps(item.get("tmdbRaw"), ensure_ascii=False),
json.dumps(item.get("hdhiveRaw"), ensure_ascii=False),
item.get("cmsId"),
item["ingestStatus"],
),
)
conn.commit()
conn.close()
def find_media_item(tmdb_id):
conn = get_conn()
row = conn.execute(
"SELECT * FROM media_items WHERE tmdb_id = ?",
(str(tmdb_id),),
).fetchone()
conn.close()
if not row:
return None
return dict(row)
def list_tasks(limit=50):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM tasks ORDER BY started_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
result = []
for row in rows:
item = dict(row)
item["inputPayload"] = json.loads(item.pop("input_payload") or "{}")
item["startedAt"] = item.pop("started_at")
item["finishedAt"] = item.pop("finished_at")
item["taskId"] = item.pop("task_id")
item["traceId"] = item.pop("trace_id")
item["summary"] = json.loads(item["summary"]) if item.get("summary") else None
result.append(item)
return result
def list_logs(task_id):
conn = get_conn()
rows = conn.execute(
"SELECT task_id, step, level, message, detail, created_at FROM task_logs WHERE task_id = ? ORDER BY id ASC",
(task_id,),
).fetchall()
conn.close()
logs = []
for row in rows:
item = dict(row)
item["taskId"] = item.pop("task_id")
item["createdAt"] = item.pop("created_at")
item["detail"] = json.loads(item["detail"]) if item.get("detail") else {}
logs.append(item)
return logs