Implement full media crawler workflow with Flask backend and Vue frontend.

Add TMDB search and media detail pages, HDHive resource ingestion flow, unified error handling, Docker single-container runtime, and project docs/config updates for local deployment.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
renjue
2026-05-09 16:16:18 +08:00
parent d3550bf79b
commit 82581d2949
49 changed files with 4959 additions and 0 deletions

24
backend/.env.example Normal file
View File

@@ -0,0 +1,24 @@
FLASK_RUN_PORT=14620
FLASK_DEBUG=1
TMDB_BASE_URL=https://api.themoviedb.org/3
TMDB_TOKEN=
HDHIVE_BASE_URL=https://hdhive.com
HDHIVE_API_KEY=
HDHIVE_ACCESS_TOKEN=
CMS_BASE_URL=
CMS_TOKEN=
CMS_LOGIN_URL=
CMS_ADD_SHARE_URL=
CMS_USERNAME=
CMS_PASSWORD=
EMBY_BASE_URL=
EMBY_TOKEN=
TLS_INSECURE_SKIP_VERIFY=0
MAX_RETRY=3
RETRY_DELAY_MS=500

View File

View File

@@ -0,0 +1,146 @@
import time
from config import Config
from http_client import request_json
from error_handling import AppServiceError
_CMS_TOKEN_CACHE = {"token": "", "expires_at": 0}
def _resolve_login_url():
if Config.CMS_LOGIN_URL:
return Config.CMS_LOGIN_URL
if Config.CMS_BASE_URL:
return f"{Config.CMS_BASE_URL.rstrip('/')}/api/auth/login"
raise AppServiceError(
"CMS login url is not configured",
category="validation",
code="CMS_CONFIG_MISSING",
provider="cms",
)
def _resolve_add_share_url():
if Config.CMS_ADD_SHARE_URL:
return Config.CMS_ADD_SHARE_URL
if Config.CMS_BASE_URL:
return f"{Config.CMS_BASE_URL.rstrip('/')}/api/cloud/add_share_down"
raise AppServiceError(
"CMS add share url is not configured",
category="validation",
code="CMS_CONFIG_MISSING",
provider="cms",
)
def _headers(token):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def _extract_cms_token(login_result):
data = login_result.get("data") or {}
token = (data.get("data") or {}).get("token")
if not token:
raise AppServiceError(
"CMS login succeeded but token missing",
category="upstream",
code="CMS_TOKEN_MISSING",
provider="cms",
detail={"response": data},
)
return token
def _login_and_get_token():
if Config.CMS_TOKEN:
return Config.CMS_TOKEN
if not Config.CMS_USERNAME or not Config.CMS_PASSWORD:
raise AppServiceError(
"CMS username/password is required when CMS_TOKEN is not provided",
category="validation",
code="CMS_CONFIG_MISSING",
provider="cms",
)
login_result = request_json(
_resolve_login_url(),
method="POST",
payload={"username": Config.CMS_USERNAME, "password": Config.CMS_PASSWORD},
headers={"Content-Type": "application/json"},
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="cms",
)
token = _extract_cms_token(login_result)
_CMS_TOKEN_CACHE["token"] = token
_CMS_TOKEN_CACHE["expires_at"] = int(time.time()) + 3500
return token
def _get_cached_token():
if Config.CMS_TOKEN:
return Config.CMS_TOKEN
if _CMS_TOKEN_CACHE["token"] and _CMS_TOKEN_CACHE["expires_at"] > int(time.time()):
return _CMS_TOKEN_CACHE["token"]
return _login_and_get_token()
def _should_refresh_token(response_data):
code = (response_data or {}).get("code")
msg = (response_data or {}).get("msg") or (response_data or {}).get("message") or ""
return code != 200 and msg != "提取分享链接失败"
def _add_share(url_value, token):
return request_json(
_resolve_add_share_url(),
method="POST",
payload={"url": url_value},
headers=_headers(token),
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="cms",
)
def create_resource(payload):
resource = payload.get("resource") or {}
share_url = resource.get("unlockUrl") or payload.get("url")
if not share_url:
raise AppServiceError(
"CMS ingest requires unlockUrl",
category="validation",
code="CMS_INPUT_INVALID",
provider="cms",
)
token = _get_cached_token()
first_result = _add_share(share_url, token)
first_data = first_result.get("data") or {}
if _should_refresh_token(first_data):
refreshed_token = _login_and_get_token()
second_result = _add_share(share_url, refreshed_token)
second_data = second_result.get("data") or {}
if (second_data.get("code") or 0) != 200:
raise AppServiceError(
second_data.get("msg") or second_data.get("message") or "CMS ingest failed",
category="upstream",
code=str(second_data.get("code") or "CMS_INGEST_FAILED"),
provider="cms",
detail={"response": second_data},
)
return second_result
if (first_data.get("code") or 0) != 200:
raise AppServiceError(
first_data.get("msg") or first_data.get("message") or "CMS ingest failed",
category="business_rule"
if (first_data.get("msg") == "提取分享链接失败")
else "upstream",
code=str(first_data.get("code") or "CMS_INGEST_FAILED"),
provider="cms",
detail={"response": first_data},
)
return first_result

View File

@@ -0,0 +1,23 @@
from config import Config
from http_client import request_json
def _headers():
headers = {"Content-Type": "application/json"}
if Config.EMBY_TOKEN:
headers["X-Emby-Token"] = Config.EMBY_TOKEN
return headers
def exists_by_tmdb_id(tmdb_id):
url = f"{Config.EMBY_BASE_URL}/Items?AnyProviderIdEquals=Tmdb.{tmdb_id}&Recursive=true&Limit=1"
result = request_json(
url,
headers=_headers(),
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="emby",
)
total = ((result.get("data") or {}).get("TotalRecordCount")) or 0
result["exists"] = total > 0
return result

View File

@@ -0,0 +1,60 @@
from config import Config
from http_client import request_json
def _headers():
headers = {
"Accept": "application/json",
"X-API-Key": Config.HDHIVE_API_KEY,
}
if Config.HDHIVE_ACCESS_TOKEN:
headers["Authorization"] = f"Bearer {Config.HDHIVE_ACCESS_TOKEN}"
return headers
def search_resource(media_type, tmdb_id):
url = f"{Config.HDHIVE_BASE_URL}/api/open/resources/{media_type}/{tmdb_id}"
return request_json(
url,
headers=_headers(),
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="hdhive",
)
def unlock_link(slug):
url = f"{Config.HDHIVE_BASE_URL}/api/open/resources/unlock"
return request_json(
url,
method="POST",
payload={"slug": slug},
headers={**_headers(), "Content-Type": "application/json"},
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="hdhive",
)
def normalize_resource(search_data, unlock_data):
resolution = (search_data or {}).get("video_resolution")
source = (search_data or {}).get("source")
subtitle_language = (search_data or {}).get("subtitle_language")
return {
"resourceTitle": (search_data or {}).get("title", ""),
"quality": ", ".join(resolution) if isinstance(resolution, list) else "",
"size": (search_data or {}).get("share_size", ""),
"diskType": (search_data or {}).get("pan_type", ""),
"source": ", ".join(source) if isinstance(source, list) else "",
"subtitleLanguage": ", ".join(subtitle_language)
if isinstance(subtitle_language, list)
else "",
"slug": (search_data or {}).get("slug", ""),
"unlockUrl": (unlock_data or {}).get("full_url")
or (unlock_data or {}).get("url")
or "",
"availability": "available"
if ((unlock_data or {}).get("full_url") or (unlock_data or {}).get("url"))
else "unknown",
"raw": {"searchData": search_data, "unlockData": unlock_data},
}

View File

@@ -0,0 +1,56 @@
from config import Config
from http_client import request_json
from urllib.parse import quote
def _headers():
headers = {"Content-Type": "application/json"}
if Config.TMDB_TOKEN:
headers["Authorization"] = f"Bearer {Config.TMDB_TOKEN}"
return headers
def search_media(query, media_type="movie", page=1):
normalized_type = "tv" if media_type == "tv" else "movie"
url = (
f"{Config.TMDB_BASE_URL}/search/{normalized_type}"
f"?language=zh-CN&query={quote(str(query))}&page={page}"
)
result = request_json(
url,
headers=_headers(),
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="tmdb",
)
data = result.get("data") or {}
result["items"] = data.get("results") if isinstance(data, dict) else []
return result
def get_media_detail(tmdb_id, media_type):
normalized_type = "tv" if media_type == "tv" else "movie"
url = f"{Config.TMDB_BASE_URL}/{normalized_type}/{tmdb_id}?language=zh-CN"
result = request_json(
url,
headers=_headers(),
max_retry=Config.MAX_RETRY,
retry_delay_ms=Config.RETRY_DELAY_MS,
provider="tmdb",
)
data = result.get("data") or {}
normalized = {
"tmdbId": tmdb_id,
"type": normalized_type,
"title": data.get("title") or data.get("name") or "",
"originalTitle": data.get("original_title") or data.get("original_name") or "",
"overview": data.get("overview") or "",
"year": (data.get("release_date") or data.get("first_air_date") or "")[:4],
"rating": data.get("vote_average"),
"posterPath": data.get("poster_path") or "",
"genres": [g.get("name") for g in data.get("genres", []) if g.get("name")],
"seasons": len(data.get("seasons", [])) if isinstance(data.get("seasons"), list) else 0,
"raw": data,
}
result["normalized"] = normalized
return result

102
backend/app.py Normal file
View File

@@ -0,0 +1,102 @@
from flask import Flask, jsonify, request
from flask_cors import CORS
from config import Config
from error_handling import AppServiceError, normalize_exception
from services.media_service import (
get_media_resources,
search_media_by_keyword,
validate_media_query,
validate_media_type,
)
from services.orchestrator import run_ingest_task
from storage import init_db, list_logs, list_tasks
def create_app():
def error_response(error, fallback_status=400):
normalized = normalize_exception(error)
status = normalized.status or fallback_status
return jsonify({"error": normalized.to_dict()}), status
app = Flask(__name__)
CORS(app)
init_db()
@app.get("/api/health")
def health():
return jsonify({"ok": True})
@app.post("/api/tasks")
def create_task():
data = request.get_json(silent=True) or {}
tmdb_id = str(data.get("tmdbId", "")).strip()
media_type = data.get("type", "movie")
keyword = str(data.get("keyword", "")).strip()
if not tmdb_id:
return error_response(
AppServiceError(
"tmdbId is required",
category="validation",
code="INVALID_INPUT",
status=400,
provider="api",
),
400,
)
if media_type not in ("movie", "tv"):
return error_response(
AppServiceError(
"type must be movie or tv",
category="validation",
code="INVALID_INPUT",
status=400,
provider="api",
),
400,
)
task = run_ingest_task(
{
"tmdbId": tmdb_id,
"type": media_type,
"keyword": keyword,
"slug": str(data.get("slug", "")).strip(),
}
)
return jsonify(task)
@app.get("/api/media/search")
def search_media_handler():
query = str(request.args.get("query", "")).strip()
media_type = str(request.args.get("type", "movie")).strip()
try:
validate_media_query(query, media_type)
return jsonify(search_media_by_keyword(query, media_type))
except Exception as error:
return error_response(error)
@app.get("/api/media/<media_type>/<tmdb_id>")
def media_detail_handler(media_type, tmdb_id):
try:
validate_media_type(media_type)
return jsonify(get_media_resources(media_type, str(tmdb_id)))
except Exception as error:
return error_response(error)
@app.get("/api/tasks")
def get_tasks():
return jsonify({"items": list_tasks()})
@app.get("/api/tasks/<task_id>/logs")
def get_task_logs(task_id):
return jsonify({"items": list_logs(task_id)})
return app
app = create_app()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=Config.FLASK_RUN_PORT, debug=Config.FLASK_DEBUG)

31
backend/config.py Normal file
View File

@@ -0,0 +1,31 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
FLASK_RUN_PORT = int(os.getenv("FLASK_RUN_PORT", "14620"))
FLASK_DEBUG = os.getenv("FLASK_DEBUG", "1") == "1"
TMDB_BASE_URL = os.getenv("TMDB_BASE_URL", "https://api.themoviedb.org/3")
TMDB_TOKEN = os.getenv("TMDB_TOKEN", "")
HDHIVE_BASE_URL = os.getenv("HDHIVE_BASE_URL", "https://hdhive.com")
HDHIVE_API_KEY = os.getenv("HDHIVE_API_KEY", "")
HDHIVE_ACCESS_TOKEN = os.getenv("HDHIVE_ACCESS_TOKEN", "")
CMS_BASE_URL = os.getenv("CMS_BASE_URL", "")
CMS_TOKEN = os.getenv("CMS_TOKEN", "")
CMS_LOGIN_URL = os.getenv("CMS_LOGIN_URL", "")
CMS_ADD_SHARE_URL = os.getenv("CMS_ADD_SHARE_URL", "")
CMS_USERNAME = os.getenv("CMS_USERNAME", "")
CMS_PASSWORD = os.getenv("CMS_PASSWORD", "")
EMBY_BASE_URL = os.getenv("EMBY_BASE_URL", "")
EMBY_TOKEN = os.getenv("EMBY_TOKEN", "")
TLS_INSECURE_SKIP_VERIFY = os.getenv("TLS_INSECURE_SKIP_VERIFY", "0") == "1"
MAX_RETRY = int(os.getenv("MAX_RETRY", "3"))
RETRY_DELAY_MS = int(os.getenv("RETRY_DELAY_MS", "500"))

64
backend/error_handling.py Normal file
View File

@@ -0,0 +1,64 @@
class AppServiceError(Exception):
def __init__(
self,
message,
*,
category="internal",
code="INTERNAL_ERROR",
status=None,
retryable=False,
provider="system",
detail=None,
):
super().__init__(message)
self.category = category
self.code = code
self.status = status
self.retryable = retryable
self.provider = provider
self.detail = detail or {}
def to_dict(self):
return {
"message": str(self),
"category": self.category,
"code": self.code,
"status": self.status,
"retryable": self.retryable,
"provider": self.provider,
"detail": self.detail,
}
def classify_http_error(status, code, retryable=False):
code = str(code or "").upper()
if status == 429 or code in {"RATE_LIMIT_EXCEEDED"}:
return "rate_limit"
if status == 401 or code in {
"MISSING_API_KEY",
"INVALID_API_KEY",
"DISABLED_API_KEY",
"EXPIRED_API_KEY",
"INVALID_OPENAPI_USER_TOKEN",
"OPENAPI_TOKEN_APP_MISMATCH",
}:
return "authentication"
if status == 403 or code in {"OPENAPI_USER_REQUIRED", "SCOPE_NOT_ALLOWED", "USER_SCOPE_NOT_ALLOWED"}:
return "authorization"
if status == 404:
return "not_found"
if status == 400:
return "validation"
if status == 402 or code in {"INSUFFICIENT_POINTS", "VIP_REQUIRED"}:
return "business_rule"
if status and status >= 500:
return "upstream"
if retryable:
return "upstream"
return "unknown"
def normalize_exception(error):
if isinstance(error, AppServiceError):
return error
return AppServiceError(str(error), category="internal", code="INTERNAL_ERROR")

96
backend/http_client.py Normal file
View File

@@ -0,0 +1,96 @@
import time
import requests
from config import Config
from error_handling import AppServiceError, classify_http_error
RETRYABLE_STATUS = {408, 425, 429, 500, 502, 503, 504}
if Config.TLS_INSECURE_SKIP_VERIFY:
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
def request_json(
url,
method="GET",
headers=None,
payload=None,
max_retry=3,
retry_delay_ms=500,
provider="external",
):
attempt = 0
headers = headers or {}
while attempt <= max_retry:
start = time.perf_counter()
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=payload,
timeout=20,
verify=not Config.TLS_INSECURE_SKIP_VERIFY,
)
cost_ms = round((time.perf_counter() - start) * 1000)
data = None
if response.text:
try:
data = response.json()
except ValueError:
data = response.text
if not response.ok:
retryable = response.status_code in RETRYABLE_STATUS
code = str((data or {}).get("code", response.status_code)) if isinstance(data, dict) else str(response.status_code)
message = (data or {}).get("message", f"HTTP {response.status_code}") if isinstance(data, dict) else f"HTTP {response.status_code}"
category = classify_http_error(response.status_code, code, retryable=retryable)
raise AppServiceError(
message,
category=category,
code=code,
status=response.status_code,
retryable=retryable,
provider=provider,
detail={"data": data, "costMs": cost_ms, "url": url},
)
return {"data": data, "status": response.status_code, "cost_ms": cost_ms}
except requests.Timeout as error:
retryable = True
if not retryable or attempt == max_retry:
raise AppServiceError(
"request timeout",
category="timeout",
code="REQUEST_TIMEOUT",
status=None,
retryable=True,
provider=provider,
detail={"url": url, "reason": str(error)},
) from error
delay = (2**attempt) * retry_delay_ms / 1000.0
time.sleep(delay)
attempt += 1
except requests.RequestException as error:
retryable = True
if attempt == max_retry:
raise AppServiceError(
"network request failed",
category="network",
code="NETWORK_ERROR",
status=None,
retryable=True,
provider=provider,
detail={"url": url, "reason": str(error)},
) from error
delay = (2**attempt) * retry_delay_ms / 1000.0
time.sleep(delay)
attempt += 1
except AppServiceError as error:
retryable = error.retryable
if not retryable or attempt == max_retry:
raise
delay = (2**attempt) * retry_delay_ms / 1000.0
time.sleep(delay)
attempt += 1

4
backend/requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
Flask==3.0.3
Flask-Cors==4.0.1
python-dotenv==1.0.1
requests==2.32.3

View File

View File

@@ -0,0 +1,73 @@
from adapters.hdhive_adapter import normalize_resource, search_resource, unlock_link
from adapters.tmdb_adapter import get_media_detail, search_media
from error_handling import AppServiceError
def search_media_by_keyword(query, media_type):
result = search_media(query, media_type)
raw_items = result.get("items") or []
items = []
for item in raw_items:
items.append(
{
"id": item.get("id"),
"type": media_type,
"title": item.get("title") or item.get("name"),
"overview": item.get("overview") or "",
"posterPath": item.get("poster_path") or "",
"releaseDate": item.get("release_date") or item.get("first_air_date") or "",
"voteAverage": item.get("vote_average"),
}
)
return {"items": items}
def get_media_resources(media_type, tmdb_id):
detail = get_media_detail(tmdb_id, media_type)
hdhive = search_resource(media_type, tmdb_id)
search_data = hdhive.get("data") or []
if isinstance(search_data, dict):
search_data = search_data.get("items") or []
resources = []
for item in search_data:
slug = (item or {}).get("slug")
unlock_data = {}
unlock_error = None
if slug:
try:
unlock = unlock_link(slug)
unlock_data = unlock.get("data") or {}
except Exception as error:
unlock_error = str(error)
normalized = normalize_resource(item, unlock_data)
normalized["unlockError"] = unlock_error
resources.append(normalized)
return {
"media": detail.get("normalized"),
"resources": resources,
}
def validate_media_query(query, media_type):
if not query:
raise AppServiceError(
"query is required",
category="validation",
code="INVALID_INPUT",
status=400,
provider="api",
)
validate_media_type(media_type)
def validate_media_type(media_type):
if media_type not in ("movie", "tv"):
raise AppServiceError(
"type must be movie or tv",
category="validation",
code="INVALID_INPUT",
status=400,
provider="api",
)

View File

@@ -0,0 +1,176 @@
import time
from datetime import datetime
from adapters.cms_adapter import create_resource
from adapters.emby_adapter import exists_by_tmdb_id
from adapters.hdhive_adapter import normalize_resource, search_resource, unlock_link
from adapters.tmdb_adapter import get_media_detail
from error_handling import AppServiceError, normalize_exception
from storage import find_media_item, insert_log, upsert_media_item, upsert_task
def now_iso():
return datetime.utcnow().isoformat() + "Z"
def new_task_id():
return f"task_{int(time.time() * 1000)}"
def log(task_id, step, level, message, detail=None):
insert_log(task_id, step, level, message, detail or {}, now_iso())
def run_ingest_task(payload):
task_id = new_task_id()
trace_id = f"{task_id}_{hex(int(time.time() * 1000))[-6:]}"
task = {
"taskId": task_id,
"traceId": trace_id,
"status": "RUNNING",
"inputPayload": payload,
"startedAt": now_iso(),
"finishedAt": None,
"summary": None,
}
upsert_task(task)
log(task_id, "START", "INFO", "任务开始", {"payload": payload, "traceId": trace_id})
try:
deduped = find_media_item(payload["tmdbId"])
if deduped and deduped.get("ingest_status") == "SUCCESS":
task["status"] = "SUCCESS"
task["finishedAt"] = now_iso()
task["summary"] = {
"result": "SKIPPED_ALREADY_EXISTS",
"tmdbId": payload["tmdbId"],
"cmsId": deduped.get("cms_id"),
}
upsert_task(task)
log(task_id, "DEDUPE", "INFO", "命中本地幂等,跳过", task["summary"])
return task
tmdb_result = get_media_detail(payload["tmdbId"], payload["type"])
log(task_id, "TMDB_DETAIL", "INFO", "TMDB 元数据获取成功", {"status": tmdb_result["status"]})
hdhive_search = search_resource(payload["type"], payload["tmdbId"])
hdhive_first = None
search_data = hdhive_search.get("data") or {}
preferred_slug = str(payload.get("slug") or "").strip()
if isinstance(search_data, list) and search_data:
if preferred_slug:
hdhive_first = next(
(item for item in search_data if (item or {}).get("slug") == preferred_slug),
None,
)
if not hdhive_first:
hdhive_first = search_data[0]
elif isinstance(search_data, dict):
items = search_data.get("items") or []
if preferred_slug:
hdhive_first = next(
(item for item in items if (item or {}).get("slug") == preferred_slug),
None,
)
if items:
hdhive_first = hdhive_first or items[0]
if not hdhive_first:
raise AppServiceError(
"HDHIVE 未检索到可用资源",
category="not_found",
code="HDHIVE_RESOURCE_NOT_FOUND",
provider="hdhive",
)
log(task_id, "HDHIVE_SEARCH", "INFO", "HDHIVE 检索成功")
slug = hdhive_first.get("slug")
if not slug:
raise AppServiceError(
"HDHIVE 返回资源缺少 slug无法解锁",
category="validation",
code="HDHIVE_INVALID_RESOURCE",
provider="hdhive",
)
hdhive_unlock = unlock_link(slug)
normalized_resource = normalize_resource(hdhive_first, hdhive_unlock.get("data"))
log(task_id, "HDHIVE_UNLOCK", "INFO", "HDHIVE 解锁成功", {"unlockUrl": normalized_resource["unlockUrl"]})
emby_exists = exists_by_tmdb_id(payload["tmdbId"])
log(task_id, "EMBY_EXISTS", "INFO", "Emby 查询完成", {"exists": emby_exists["exists"]})
if emby_exists["exists"]:
upsert_media_item(
{
"tmdbId": payload["tmdbId"],
"type": payload["type"],
"title": tmdb_result["normalized"]["title"],
"year": tmdb_result["normalized"]["year"],
"tmdbRaw": tmdb_result["data"],
"hdhiveRaw": hdhive_search["data"],
"cmsId": None,
"ingestStatus": "SKIPPED_ALREADY_EXISTS",
}
)
task["status"] = "SUCCESS"
task["finishedAt"] = now_iso()
task["summary"] = {"result": "SKIPPED_ALREADY_EXISTS", "source": "EMBY"}
upsert_task(task)
return task
cms_payload = {
"tmdbId": payload["tmdbId"],
"mediaType": payload["type"],
"title": tmdb_result["normalized"]["title"],
"originalTitle": tmdb_result["normalized"]["originalTitle"],
"year": tmdb_result["normalized"]["year"],
"overview": tmdb_result["normalized"]["overview"],
"posterPath": tmdb_result["normalized"]["posterPath"],
"rating": tmdb_result["normalized"]["rating"],
"genres": tmdb_result["normalized"]["genres"],
"resource": {
"title": normalized_resource["resourceTitle"],
"quality": normalized_resource["quality"],
"size": normalized_resource["size"],
"diskType": normalized_resource["diskType"],
"slug": normalized_resource["slug"],
"source": normalized_resource["source"],
"subtitleLanguage": normalized_resource["subtitleLanguage"],
"unlockUrl": normalized_resource["unlockUrl"],
},
"traceId": trace_id,
}
cms_result = create_resource(cms_payload)
cms_data = cms_result.get("data") or {}
cms_id = (
cms_data.get("id")
or cms_data.get("resourceId")
or (cms_data.get("data") or {}).get("id")
or (cms_data.get("data") or {}).get("resourceId")
or normalized_resource["slug"]
)
log(task_id, "CMS_CREATE", "INFO", "CMS 入库成功", {"cmsId": cms_id})
upsert_media_item(
{
"tmdbId": payload["tmdbId"],
"type": payload["type"],
"title": tmdb_result["normalized"]["title"],
"year": tmdb_result["normalized"]["year"],
"tmdbRaw": tmdb_result["data"],
"hdhiveRaw": hdhive_search["data"],
"cmsId": cms_id,
"ingestStatus": "SUCCESS",
}
)
task["status"] = "SUCCESS"
task["finishedAt"] = now_iso()
task["summary"] = {"result": "CREATED", "cmsId": cms_id}
upsert_task(task)
return task
except Exception as error:
normalized_error = normalize_exception(error)
log(task_id, "FAILED", "ERROR", str(normalized_error), normalized_error.to_dict())
task["status"] = "FAILED"
task["finishedAt"] = now_iso()
task["summary"] = {"error": normalized_error.to_dict()}
upsert_task(task)
return task

182
backend/storage.py Normal file
View File

@@ -0,0 +1,182 @@
import json
import sqlite3
from pathlib import Path
DB_PATH = Path(__file__).resolve().parent / "media_crawler.db"
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_conn()
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
status TEXT NOT NULL,
input_payload TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
summary TEXT
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS media_items (
tmdb_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
title TEXT,
year TEXT,
tmdb_raw TEXT,
hdhive_raw TEXT,
cms_id TEXT,
ingest_status TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
step TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
detail TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
conn.close()
def upsert_task(task):
conn = get_conn()
conn.execute(
"""
INSERT INTO tasks(task_id, trace_id, status, input_payload, started_at, finished_at, summary)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(task_id) DO UPDATE SET
status=excluded.status,
finished_at=excluded.finished_at,
summary=excluded.summary
""",
(
task["taskId"],
task["traceId"],
task["status"],
json.dumps(task["inputPayload"], ensure_ascii=False),
task["startedAt"],
task.get("finishedAt"),
json.dumps(task.get("summary"), ensure_ascii=False),
),
)
conn.commit()
conn.close()
def insert_log(task_id, step, level, message, detail, created_at):
conn = get_conn()
conn.execute(
"""
INSERT INTO task_logs(task_id, step, level, message, detail, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
task_id,
step,
level,
message,
json.dumps(detail, ensure_ascii=False),
created_at,
),
)
conn.commit()
conn.close()
def upsert_media_item(item):
conn = get_conn()
conn.execute(
"""
INSERT INTO media_items(tmdb_id, type, title, year, tmdb_raw, hdhive_raw, cms_id, ingest_status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id) DO UPDATE SET
type=excluded.type,
title=excluded.title,
year=excluded.year,
tmdb_raw=excluded.tmdb_raw,
hdhive_raw=excluded.hdhive_raw,
cms_id=excluded.cms_id,
ingest_status=excluded.ingest_status
""",
(
str(item["tmdbId"]),
item["type"],
item.get("title"),
item.get("year"),
json.dumps(item.get("tmdbRaw"), ensure_ascii=False),
json.dumps(item.get("hdhiveRaw"), ensure_ascii=False),
item.get("cmsId"),
item["ingestStatus"],
),
)
conn.commit()
conn.close()
def find_media_item(tmdb_id):
conn = get_conn()
row = conn.execute(
"SELECT * FROM media_items WHERE tmdb_id = ?",
(str(tmdb_id),),
).fetchone()
conn.close()
if not row:
return None
return dict(row)
def list_tasks(limit=50):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM tasks ORDER BY started_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
result = []
for row in rows:
item = dict(row)
item["inputPayload"] = json.loads(item.pop("input_payload") or "{}")
item["startedAt"] = item.pop("started_at")
item["finishedAt"] = item.pop("finished_at")
item["taskId"] = item.pop("task_id")
item["traceId"] = item.pop("trace_id")
item["summary"] = json.loads(item["summary"]) if item.get("summary") else None
result.append(item)
return result
def list_logs(task_id):
conn = get_conn()
rows = conn.execute(
"SELECT task_id, step, level, message, detail, created_at FROM task_logs WHERE task_id = ? ORDER BY id ASC",
(task_id,),
).fetchall()
conn.close()
logs = []
for row in rows:
item = dict(row)
item["taskId"] = item.pop("task_id")
item["createdAt"] = item.pop("created_at")
item["detail"] = json.loads(item["detail"]) if item.get("detail") else {}
logs.append(item)
return logs