import time from datetime import datetime from adapters.cms_adapter import create_resource from adapters.emby_adapter import exists_by_tmdb_id from adapters.hdhive_adapter import normalize_resource, search_resource, unlock_link from adapters.tmdb_adapter import get_media_detail from error_handling import AppServiceError, normalize_exception from storage import find_media_item, insert_log, upsert_media_item, upsert_task def _extract_hdhive_items(hdhive_result): payload = (hdhive_result or {}).get("data") if isinstance(payload, dict) and isinstance(payload.get("data"), list): return payload.get("data") or [] if isinstance(payload, list): return payload if isinstance(payload, dict): return payload.get("items") or [] return [] def _extract_hdhive_unlock_data(unlock_result): payload = (unlock_result or {}).get("data") if isinstance(payload, dict) and isinstance(payload.get("data"), dict): return payload.get("data") or {} if isinstance(payload, dict): return payload return {} def now_iso(): return datetime.utcnow().isoformat() + "Z" def new_task_id(): return f"task_{int(time.time() * 1000)}" def log(task_id, step, level, message, detail=None): insert_log(task_id, step, level, message, detail or {}, now_iso()) def run_ingest_task(payload): task_id = new_task_id() trace_id = f"{task_id}_{hex(int(time.time() * 1000))[-6:]}" task = { "taskId": task_id, "traceId": trace_id, "status": "RUNNING", "inputPayload": payload, "startedAt": now_iso(), "finishedAt": None, "summary": None, } upsert_task(task) log(task_id, "START", "INFO", "任务开始", {"payload": payload, "traceId": trace_id}) try: deduped = find_media_item(payload["tmdbId"]) if deduped and deduped.get("ingest_status") == "SUCCESS": task["status"] = "SUCCESS" task["finishedAt"] = now_iso() task["summary"] = { "result": "SKIPPED_ALREADY_EXISTS", "tmdbId": payload["tmdbId"], "cmsId": deduped.get("cms_id"), } upsert_task(task) log(task_id, "DEDUPE", "INFO", "命中本地幂等,跳过", task["summary"]) return task tmdb_result = get_media_detail(payload["tmdbId"], payload["type"]) log(task_id, "TMDB_DETAIL", "INFO", "TMDB 元数据获取成功", {"status": tmdb_result["status"]}) hdhive_search = search_resource(payload["type"], payload["tmdbId"]) hdhive_first = None search_data = _extract_hdhive_items(hdhive_search) preferred_slug = str(payload.get("slug") or "").strip() if search_data: if preferred_slug: hdhive_first = next( (item for item in search_data if (item or {}).get("slug") == preferred_slug), None, ) if not hdhive_first: hdhive_first = search_data[0] if not hdhive_first: raise AppServiceError( "HDHIVE 未检索到可用资源", category="not_found", code="HDHIVE_RESOURCE_NOT_FOUND", provider="hdhive", ) log(task_id, "HDHIVE_SEARCH", "INFO", "HDHIVE 检索成功") slug = hdhive_first.get("slug") if not slug: raise AppServiceError( "HDHIVE 返回资源缺少 slug,无法解锁", category="validation", code="HDHIVE_INVALID_RESOURCE", provider="hdhive", ) hdhive_unlock = unlock_link(slug) normalized_resource = normalize_resource(hdhive_first, _extract_hdhive_unlock_data(hdhive_unlock)) log(task_id, "HDHIVE_UNLOCK", "INFO", "HDHIVE 解锁成功", {"unlockUrl": normalized_resource["unlockUrl"]}) emby_exists = exists_by_tmdb_id(payload["tmdbId"]) log(task_id, "EMBY_EXISTS", "INFO", "Emby 查询完成", {"exists": emby_exists["exists"]}) if emby_exists["exists"]: upsert_media_item( { "tmdbId": payload["tmdbId"], "type": payload["type"], "title": tmdb_result["normalized"]["title"], "year": tmdb_result["normalized"]["year"], "tmdbRaw": tmdb_result["data"], "hdhiveRaw": hdhive_search["data"], "cmsId": None, "ingestStatus": "SKIPPED_ALREADY_EXISTS", } ) task["status"] = "SUCCESS" task["finishedAt"] = now_iso() task["summary"] = {"result": "SKIPPED_ALREADY_EXISTS", "source": "EMBY"} upsert_task(task) return task cms_payload = { "tmdbId": payload["tmdbId"], "mediaType": payload["type"], "title": tmdb_result["normalized"]["title"], "originalTitle": tmdb_result["normalized"]["originalTitle"], "year": tmdb_result["normalized"]["year"], "overview": tmdb_result["normalized"]["overview"], "posterPath": tmdb_result["normalized"]["posterPath"], "rating": tmdb_result["normalized"]["rating"], "genres": tmdb_result["normalized"]["genres"], "resource": { "title": normalized_resource["resourceTitle"], "quality": normalized_resource["quality"], "size": normalized_resource["size"], "diskType": normalized_resource["diskType"], "slug": normalized_resource["slug"], "source": normalized_resource["source"], "subtitleLanguage": normalized_resource["subtitleLanguage"], "unlockUrl": normalized_resource["unlockUrl"], }, "traceId": trace_id, } cms_result = create_resource(cms_payload) cms_data = cms_result.get("data") or {} cms_id = ( cms_data.get("id") or cms_data.get("resourceId") or (cms_data.get("data") or {}).get("id") or (cms_data.get("data") or {}).get("resourceId") or normalized_resource["slug"] ) log(task_id, "CMS_CREATE", "INFO", "CMS 入库成功", {"cmsId": cms_id}) upsert_media_item( { "tmdbId": payload["tmdbId"], "type": payload["type"], "title": tmdb_result["normalized"]["title"], "year": tmdb_result["normalized"]["year"], "tmdbRaw": tmdb_result["data"], "hdhiveRaw": hdhive_search["data"], "cmsId": cms_id, "ingestStatus": "SUCCESS", } ) task["status"] = "SUCCESS" task["finishedAt"] = now_iso() task["summary"] = {"result": "CREATED", "cmsId": cms_id} upsert_task(task) return task except Exception as error: normalized_error = normalize_exception(error) log(task_id, "FAILED", "ERROR", str(normalized_error), normalized_error.to_dict()) task["status"] = "FAILED" task["finishedAt"] = now_iso() task["summary"] = {"error": normalized_error.to_dict()} upsert_task(task) return task