From 88dfc09e2a0b6abde456a376f57b7d2bea41a72e Mon Sep 17 00:00:00 2001 From: zhoujie <929834232@qq.com> Date: Sun, 8 Feb 2026 21:52:29 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(config):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=A4=9A=20LLM=20=E6=8F=90=E4=BE=9B=E5=95=86=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E4=B8=8E=E8=B4=A6=E5=8F=B7=E6=95=B0=E6=8D=AE=E7=9C=8B=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增多 LLM 提供商管理功能,支持添加、删除和切换不同 API 提供商 - 新增账号数据看板,支持可视化展示用户核心指标和笔记点赞排行 - 新增自动获取并保存 xsec_token 功能,提升登录体验 - 新增退出登录功能,支持重新扫码登录 - 新增用户 ID 验证和保存功能,确保账号信息准确性 ♻️ refactor(config): 重构配置管理和 LLM 服务调用 - 重构配置管理器,支持多 LLM 提供商配置和兼容旧配置自动迁移 - 重构 LLM 服务调用逻辑,统一从配置管理器获取激活的提供商信息 - 重构 MCP 客户端,增加单例模式和自动重试机制,提升连接稳定性 - 重构数据看板页面,优化用户数据获取和可视化展示逻辑 🐛 fix(mcp): 修复 MCP 连接和登录状态检查问题 - 修复 MCP 客户端初始化问题,避免重复握手 - 修复登录状态检查逻辑,自动获取并保存 xsec_token - 修复获取我的笔记列表功能,支持通过用户 ID 准确获取 - 修复 JSON-RPC 通知格式问题,确保与 MCP 服务兼容 📝 docs(config): 更新配置文件和代码注释 - 更新配置文件结构,新增多 LLM 提供商配置字段 - 更新代码注释,明确各功能模块的作用和调用方式 - 更新用户界面提示信息,提供更清晰的操作指引 --- config copy.json | 10 + config.json | 23 +- config_manager.py | 79 +++++ llm_service.py | 19 +- main.py | 755 ++++++++++++++++++++++++++++++++++------------ mcp_client.py | 61 +++- requirements.txt | 1 + 7 files changed, 737 insertions(+), 211 deletions(-) create mode 100644 config copy.json diff --git a/config copy.json b/config copy.json new file mode 100644 index 0000000..52ab15c --- /dev/null +++ b/config copy.json @@ -0,0 +1,10 @@ +{ + "api_key": "sk-d212b926f51f4f0f9297629cd2ab77b4", + "base_url": "https://api.deepseek.com/v1", + "sd_url": "http://127.0.0.1:7860", + "mcp_url": "http://localhost:18060/mcp", + "model": "deepseek-reasoner", + "persona": "温柔知性的时尚博主", + "auto_reply_enabled": false, + "schedule_enabled": false +} \ No newline at end of file diff --git a/config.json b/config.json index 52ab15c..41d0310 100644 --- a/config.json +++ b/config.json @@ -1,10 +1,25 @@ { - "api_key": "sk-d212b926f51f4f0f9297629cd2ab77b4", - "base_url": "https://api.deepseek.com/v1", + "api_key": "sk-NPZECL5m3BmZv0S9YO9KOd179pepRH08iYeAn1Tk07Jux9Br", + "base_url": "https://wolfai.top/v1", "sd_url": "http://127.0.0.1:7860", "mcp_url": "http://localhost:18060/mcp", - "model": "deepseek-reasoner", + "model": "gemini-3-flash-preview", "persona": "温柔知性的时尚博主", "auto_reply_enabled": false, - "schedule_enabled": false + "schedule_enabled": false, + "my_user_id": "69872540000000002303cc42", + "active_llm": "wolfai", + "llm_providers": [ + { + "name": "默认", + "api_key": "sk-d212b926f51f4f0f9297629cd2ab77b4", + "base_url": "https://api.deepseek.com/v1" + }, + { + "name": "wolfai", + "api_key": "sk-NPZECL5m3BmZv0S9YO9KOd179pepRH08iYeAn1Tk07Jux9Br", + "base_url": "https://wolfai.top/v1" + } + ], + "xsec_token": "ABS1TagQqhCpZmeNlq0VoCfNEyI6Q83GJzjTGJvzEAq5I=" } \ No newline at end of file diff --git a/config_manager.py b/config_manager.py index fdab8f9..17bf9c9 100644 --- a/config_manager.py +++ b/config_manager.py @@ -20,6 +20,9 @@ DEFAULT_CONFIG = { "persona": "温柔知性的时尚博主", "auto_reply_enabled": False, "schedule_enabled": False, + "my_user_id": "", + "active_llm": "", + "llm_providers": [], } @@ -80,3 +83,79 @@ class ConfigManager: def ensure_workspace(self): """确保工作空间目录存在""" os.makedirs(OUTPUT_DIR, exist_ok=True) + + # ---------- 多 LLM 提供商管理 ---------- + + def get_llm_providers(self) -> list[dict]: + """获取所有 LLM 提供商配置""" + providers = self._config.get("llm_providers", []) + # 兼容旧配置: 如果 providers 为空但有 api_key,自动迁移 + if not providers and self._config.get("api_key"): + default_provider = { + "name": "默认", + "api_key": self._config["api_key"], + "base_url": self._config.get("base_url", "https://api.openai.com/v1"), + } + providers = [default_provider] + self._config["llm_providers"] = providers + self._config["active_llm"] = "默认" + self.save() + return providers + + def get_llm_provider_names(self) -> list[str]: + """获取所有提供商名称列表""" + return [p["name"] for p in self.get_llm_providers()] + + def get_active_llm(self) -> dict | None: + """获取当前激活的 LLM 提供商配置""" + active_name = self._config.get("active_llm", "") + for p in self.get_llm_providers(): + if p["name"] == active_name: + return p + # 没找到就返回第一个 + providers = self.get_llm_providers() + return providers[0] if providers else None + + def add_llm_provider(self, name: str, api_key: str, base_url: str) -> str: + """添加一个 LLM 提供商,返回状态消息""" + name = name.strip() + if not name: + return "❌ 名称不能为空" + if not api_key.strip(): + return "❌ API Key 不能为空" + providers = self.get_llm_providers() + for p in providers: + if p["name"] == name: + return f"❌ 名称「{name}」已存在,请换一个" + providers.append({ + "name": name, + "api_key": api_key.strip(), + "base_url": (base_url or "https://api.openai.com/v1").strip().rstrip("/"), + }) + self._config["llm_providers"] = providers + if not self._config.get("active_llm"): + self._config["active_llm"] = name + self.save() + return f"✅ 已添加「{name}」" + + def remove_llm_provider(self, name: str) -> str: + """删除一个 LLM 提供商""" + providers = self.get_llm_providers() + new_providers = [p for p in providers if p["name"] != name] + if len(new_providers) == len(providers): + return f"⚠️ 未找到「{name}」" + self._config["llm_providers"] = new_providers + if self._config.get("active_llm") == name: + self._config["active_llm"] = new_providers[0]["name"] if new_providers else "" + self.save() + return f"✅ 已删除「{name}」" + + def set_active_llm(self, name: str): + """切换当前激活的 LLM 提供商""" + self._config["active_llm"] = name + # 同步到兼容字段 + p = self.get_active_llm() + if p: + self._config["api_key"] = p["api_key"] + self._config["base_url"] = p["base_url"] + self.save() diff --git a/llm_service.py b/llm_service.py index eb47973..739f62f 100644 --- a/llm_service.py +++ b/llm_service.py @@ -127,6 +127,9 @@ class LLMService: "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } + if json_mode: + user_message = user_message + "\n请以json格式返回。" + payload = { "model": self.model, "messages": [ @@ -164,10 +167,18 @@ class LLMService: """获取可用模型列表""" url = f"{self.base_url}/models" headers = {"Authorization": f"Bearer {self.api_key}"} - resp = requests.get(url, headers=headers, timeout=10) - resp.raise_for_status() - data = resp.json() - return [item["id"] for item in data.get("data", [])] + try: + resp = requests.get(url, headers=headers, timeout=10) + resp.raise_for_status() + text = resp.text.strip() + if not text: + logger.warning("GET %s 返回空响应", url) + return [] + data = resp.json() + return [item["id"] for item in data.get("data", [])] + except Exception as e: + logger.warning("获取模型列表失败 (%s): %s", url, e) + return [] def generate_copy(self, topic: str, style: str) -> dict: """生成小红书文案""" diff --git a/main.py b/main.py index ad70240..44ddb09 100644 --- a/main.py +++ b/main.py @@ -5,16 +5,30 @@ import gradio as gr import os import re +import json import time import logging import platform import subprocess from PIL import Image +import matplotlib +import matplotlib.pyplot as plt from config_manager import ConfigManager, OUTPUT_DIR from llm_service import LLMService from sd_service import SDService, DEFAULT_NEGATIVE -from mcp_client import MCPClient +from mcp_client import MCPClient, get_mcp_client + +# ================= matplotlib 中文字体配置 ================= +_font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"] +for _fn in _font_candidates: + try: + matplotlib.font_manager.findfont(_fn, fallback_to_default=False) + plt.rcParams["font.sans-serif"] = [_fn] + break + except Exception: + continue +plt.rcParams["axes.unicode_minus"] = False # ================= 日志配置 ================= @@ -36,30 +50,94 @@ os.environ["NO_PROXY"] = "127.0.0.1,localhost" cfg = ConfigManager() cfg.ensure_workspace() -mcp = MCPClient(cfg.get("mcp_url", "http://localhost:18060/mcp")) +mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp")) + +# ================================================== +# LLM 多提供商管理 +# ================================================== + + +def _get_llm_config() -> tuple[str, str, str]: + """获取当前激活 LLM 的 (api_key, base_url, model)""" + p = cfg.get_active_llm() + if p: + return p["api_key"], p["base_url"], cfg.get("model", "") + return "", "", "" + + +def connect_llm(provider_name): + """连接选中的 LLM 提供商并获取模型列表""" + if not provider_name: + return gr.update(choices=[], value=None), "⚠️ 请先选择或添加 LLM 提供商" + cfg.set_active_llm(provider_name) + p = cfg.get_active_llm() + if not p: + return gr.update(choices=[], value=None), "❌ 未找到该提供商配置" + try: + svc = LLMService(p["api_key"], p["base_url"]) + models = svc.get_models() + if models: + return ( + gr.update(choices=models, value=models[0]), + f"✅ 已连接「{provider_name}」,加载 {len(models)} 个模型", + ) + else: + # API 无法获取模型列表,保留手动输入 + current_model = cfg.get("model", "") + return ( + gr.update(choices=[current_model] if current_model else [], value=current_model or None), + f"⚠️ 已连接「{provider_name}」,但未获取到模型列表,请手动输入模型名", + ) + except Exception as e: + logger.error("LLM 连接失败: %s", e) + current_model = cfg.get("model", "") + return ( + gr.update(choices=[current_model] if current_model else [], value=current_model or None), + f"❌ 连接「{provider_name}」失败: {e}", + ) + + +def add_llm_provider(name, api_key, base_url): + """添加新的 LLM 提供商""" + msg = cfg.add_llm_provider(name, api_key, base_url) + names = cfg.get_llm_provider_names() + active = cfg.get("active_llm", "") + return ( + gr.update(choices=names, value=active), + msg, + ) + + +def remove_llm_provider(provider_name): + """删除 LLM 提供商""" + if not provider_name: + return gr.update(choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", "")), "⚠️ 请先选择要删除的提供商" + msg = cfg.remove_llm_provider(provider_name) + names = cfg.get_llm_provider_names() + active = cfg.get("active_llm", "") + return ( + gr.update(choices=names, value=active), + msg, + ) + + +def on_provider_selected(provider_name): + """切换 LLM 提供商时更新显示信息""" + if not provider_name: + return "未选择提供商" + for p in cfg.get_llm_providers(): + if p["name"] == provider_name: + cfg.set_active_llm(provider_name) + masked_key = p["api_key"][:8] + "***" if len(p["api_key"]) > 8 else "***" + return f"**{provider_name}** \nAPI Key: `{masked_key}` \nBase URL: `{p['base_url']}`" + return "未找到该提供商" + # ================================================== # Tab 1: 内容创作 # ================================================== -def connect_llm(api_key, base_url): - """连接 LLM 并获取模型列表""" - if not api_key or not base_url: - return gr.update(choices=[]), "⚠️ 请先填写 API Key 和 Base URL" - try: - svc = LLMService(api_key, base_url) - models = svc.get_models() - cfg.update({"api_key": api_key, "base_url": base_url}) - return ( - gr.update(choices=models, value=models[0] if models else None), - f"✅ 已连接,加载 {len(models)} 个模型", - ) - except Exception as e: - logger.error("LLM 连接失败: %s", e) - return gr.update(), f"❌ 连接失败: {e}" - - def connect_sd(sd_url): """连接 SD 并获取模型列表""" try: @@ -78,7 +156,7 @@ def connect_sd(sd_url): def check_mcp_status(mcp_url): """检查 MCP 连接状态""" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) ok, msg = client.check_connection() if ok: cfg.set("mcp_url", mcp_url) @@ -96,7 +174,7 @@ def check_mcp_status(mcp_url): def get_login_qrcode(mcp_url): """获取小红书登录二维码""" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.get_login_qrcode() if "error" in result: return None, f"❌ 获取二维码失败: {result['error']}" @@ -110,25 +188,82 @@ def get_login_qrcode(mcp_url): return None, f"❌ 获取二维码失败: {e}" -def check_login(mcp_url): - """检查小红书登录状态""" +def logout_xhs(mcp_url): + """退出登录:清除 cookies 并重置本地 token""" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) + result = client.delete_cookies() + if "error" in result: + return f"❌ 退出失败: {result['error']}" + cfg.set("xsec_token", "") + client._reset() + return "✅ 已退出登录,可以重新扫码登录" + except Exception as e: + logger.error("退出登录失败: %s", e) + return f"❌ 退出失败: {e}" + + +def _auto_fetch_xsec_token(mcp_url) -> str: + """从推荐列表自动获取一个有效的 xsec_token""" + try: + client = get_mcp_client(mcp_url) + entries = client.list_feeds_parsed() + for e in entries: + token = e.get("xsec_token", "") + if token: + return token + except Exception as e: + logger.warning("自动获取 xsec_token 失败: %s", e) + return "" + + +def check_login(mcp_url): + """检查登录状态,登录成功后自动获取 xsec_token 并保存""" + try: + client = get_mcp_client(mcp_url) result = client.check_login_status() if "error" in result: - return f"❌ {result['error']}" + return f"❌ {result['error']}", gr.update(), gr.update() text = result.get("text", "") if "未登录" in text: - return f"🔴 {text}" - return f"🟢 {text}" + return f"🔴 {text}", gr.update(), gr.update() + + # 登录成功 → 自动获取 xsec_token + token = _auto_fetch_xsec_token(mcp_url) + if token: + cfg.set("xsec_token", token) + logger.info("自动获取 xsec_token 成功") + return ( + f"🟢 {text}\n\n✅ xsec_token 已自动获取并保存", + gr.update(value=cfg.get("my_user_id", "")), + gr.update(value=token), + ) + return f"🟢 {text}\n\n⚠️ 自动获取 xsec_token 失败,请手动刷新", gr.update(), gr.update() except Exception as e: - return f"❌ 检查登录状态失败: {e}" + return f"❌ 检查登录状态失败: {e}", gr.update(), gr.update() -def generate_copy(api_key, base_url, model, topic, style): +def save_my_user_id(user_id_input): + """保存用户 ID (验证 24 位十六进制格式)""" + uid = (user_id_input or "").strip() + if not uid: + cfg.set("my_user_id", "") + return "⚠️ 已清除用户 ID" + if not re.match(r'^[0-9a-fA-F]{24}$', uid): + return ( + "❌ 格式错误!用户 ID 应为 24 位十六进制字符串\n" + f"你输入的: `{uid}` ({len(uid)} 位)\n\n" + "💡 如果你输入的是小红书号 (纯数字如 18688457507),那不是 userId。" + ) + cfg.set("my_user_id", uid) + return f"✅ 用户 ID 已保存: `{uid}`" + + +def generate_copy(model, topic, style): """生成文案""" + api_key, base_url, _ = _get_llm_config() if not api_key: - return "", "", "", "", "❌ 缺少 API Key" + return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" try: svc = LLMService(api_key, base_url, model) data = svc.generate_copy(topic, style) @@ -206,7 +341,7 @@ def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, sche if not title: return "❌ 缺少标题" - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) # 收集图片路径 image_paths = [] @@ -264,7 +399,7 @@ def search_hotspots(keyword, sort_by, mcp_url): if not keyword: return "❌ 请输入搜索关键词", "" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.search_feeds(keyword, sort_by=sort_by) if "error" in result: return f"❌ 搜索失败: {result['error']}", "" @@ -275,10 +410,13 @@ def search_hotspots(keyword, sort_by, mcp_url): return f"❌ 搜索失败: {e}", "" -def analyze_and_suggest(api_key, base_url, model, keyword, search_result): +def analyze_and_suggest(model, keyword, search_result): """AI 分析热点并给出建议""" if not search_result: return "❌ 请先搜索", "", "" + api_key, base_url, _ = _get_llm_config() + if not api_key: + return "❌ 请先配置 LLM 提供商", "", "" try: svc = LLMService(api_key, base_url, model) analysis = svc.analyze_hotspots(search_result) @@ -303,10 +441,13 @@ def analyze_and_suggest(api_key, base_url, model, keyword, search_result): return f"❌ 分析失败: {e}", "", "" -def generate_from_hotspot(api_key, base_url, model, topic_from_hotspot, style, search_result): +def generate_from_hotspot(model, topic_from_hotspot, style, search_result): """基于热点分析生成文案""" if not topic_from_hotspot: return "", "", "", "", "❌ 请先选择或输入选题" + api_key, base_url, _ = _get_llm_config() + if not api_key: + return "", "", "", "", "❌ 请先配置 LLM 提供商" try: svc = LLMService(api_key, base_url, model) data = svc.generate_copy_with_reference( @@ -342,7 +483,7 @@ def _fetch_and_cache(keyword, mcp_url, cache_name="proactive"): """通用: 获取笔记列表并缓存""" global _cached_proactive_entries, _cached_my_note_entries try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) if keyword and keyword.strip(): entries = client.search_feeds_parsed(keyword.strip()) src = f"搜索「{keyword.strip()}」" @@ -382,11 +523,18 @@ def _pick_from_cache(selected, cache_name="proactive"): if not selected or not cache: return "", "", "" try: + # 尝试从 [N] 前缀提取序号 idx = int(selected.split("]")[0].replace("[", "")) - 1 - e = cache[idx] - return e["feed_id"], e["xsec_token"], e.get("title", "") + if 0 <= idx < len(cache): + e = cache[idx] + return e["feed_id"], e["xsec_token"], e.get("title", "") except (ValueError, IndexError): - return "", "", "" + pass + # 回退: 模糊匹配标题 + for e in cache: + if e.get("title", "")[:15] in selected: + return e["feed_id"], e["xsec_token"], e.get("title", "") + return "", "", "" # ---- 模块 A: 主动评论他人 ---- @@ -404,7 +552,7 @@ def load_note_for_comment(feed_id, xsec_token, mcp_url): if not feed_id or not xsec_token: return "❌ 请先选择笔记", "", "", "" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "", "", "" @@ -422,11 +570,12 @@ def load_note_for_comment(feed_id, xsec_token, mcp_url): return f"❌ {e}", "", "", "" -def ai_generate_comment(api_key, base_url, model, persona, +def ai_generate_comment(model, persona, post_title, post_content, existing_comments): """AI 生成主动评论""" - if not api_key or not base_url: - return "⚠️ 请先配置 API Key", "❌ LLM 未配置" + api_key, base_url, _ = _get_llm_config() + if not api_key: + return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM", "❌ 未选模型" if not post_title and not post_content: @@ -447,7 +596,7 @@ def send_comment(feed_id, xsec_token, comment_content, mcp_url): if not all([feed_id, xsec_token, comment_content]): return "❌ 缺少必要参数 (笔记ID / token / 评论内容)" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, comment_content) if "error" in result: return f"❌ {result['error']}" @@ -458,8 +607,77 @@ def send_comment(feed_id, xsec_token, comment_content, mcp_url): # ---- 模块 B: 回复我的笔记评论 ---- -def fetch_my_notes(keyword, mcp_url): - return _fetch_and_cache(keyword, mcp_url, "my_notes") +def fetch_my_notes(mcp_url): + """通过已保存的 userId 获取我的笔记列表""" + global _cached_my_note_entries + my_uid = cfg.get("my_user_id", "") + xsec = cfg.get("xsec_token", "") + if not my_uid: + return ( + gr.update(choices=[], value=None), + "❌ 未配置用户 ID,请先到「账号登录」页填写并保存", + ) + if not xsec: + return ( + gr.update(choices=[], value=None), + "❌ 未获取 xsec_token,请先登录", + ) + try: + client = get_mcp_client(mcp_url) + result = client.get_user_profile(my_uid, xsec) + if "error" in result: + return gr.update(choices=[], value=None), f"❌ {result['error']}" + + # 从 raw 中解析 feeds + raw = result.get("raw", {}) + text = result.get("text", "") + data = None + if raw and isinstance(raw, dict): + for item in raw.get("content", []): + if item.get("type") == "text": + try: + data = json.loads(item["text"]) + except (json.JSONDecodeError, KeyError): + pass + if not data: + try: + data = json.loads(text) + except (json.JSONDecodeError, TypeError): + pass + + feeds = (data or {}).get("feeds") or [] + if not feeds: + return ( + gr.update(choices=[], value=None), + "⚠️ 未找到你的笔记,可能账号还没有发布内容", + ) + + entries = [] + for f in feeds: + nc = f.get("noteCard") or {} + user = nc.get("user") or {} + interact = nc.get("interactInfo") or {} + entries.append({ + "feed_id": f.get("id", ""), + "xsec_token": f.get("xsecToken", ""), + "title": nc.get("displayTitle", "未知标题"), + "author": user.get("nickname", user.get("nickName", "")), + "user_id": user.get("userId", ""), + "likes": interact.get("likedCount", "0"), + "type": nc.get("type", ""), + }) + + _cached_my_note_entries = entries + choices = [ + f"[{i+1}] {e['title'][:20]} | {e['type']} | ❤{e['likes']}" + for i, e in enumerate(entries) + ] + return ( + gr.update(choices=choices, value=choices[0] if choices else None), + f"✅ 找到 {len(entries)} 篇笔记", + ) + except Exception as e: + return gr.update(choices=[], value=None), f"❌ {e}" def on_my_note_selected(selected): @@ -471,7 +689,7 @@ def fetch_my_note_comments(feed_id, xsec_token, mcp_url): if not feed_id or not xsec_token: return "❌ 请先选择笔记", "" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "" @@ -480,10 +698,11 @@ def fetch_my_note_comments(feed_id, xsec_token, mcp_url): return f"❌ {e}", "" -def ai_reply_comment(api_key, base_url, model, persona, post_title, comment_text): +def ai_reply_comment(model, persona, post_title, comment_text): """AI 生成评论回复""" - if not api_key or not base_url: - return "⚠️ 请先在全局设置中填写 API Key 和 Base URL", "❌ LLM 未配置" + api_key, base_url, _ = _get_llm_config() + if not api_key: + return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM 并选择模型", "❌ 未选择模型" if not comment_text: @@ -502,7 +721,7 @@ def send_reply(feed_id, xsec_token, reply_content, mcp_url): if not all([feed_id, xsec_token, reply_content]): return "❌ 缺少必要参数" try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, reply_content) if "error" in result: return f"❌ 回复失败: {result['error']}" @@ -512,90 +731,154 @@ def send_reply(feed_id, xsec_token, reply_content, mcp_url): # ================================================== -# Tab 4: 数据看板 +# Tab 4: 数据看板 (我的账号) # ================================================== -# 全局缓存: 最近获取的用户列表 -_cached_user_entries: list[dict] = [] - - -def fetch_user_list_from_feeds(mcp_url): - """从推荐列表中提取用户列表, 供数据看板使用""" - global _cached_user_entries +def _parse_profile_json(text: str): + """尝试从文本中解析用户 profile JSON""" + if not text: + return None + # 直接 JSON try: - client = MCPClient(mcp_url) - entries = client.list_feeds_parsed() - - # 去重: 按 user_id - seen = set() - users = [] - for e in entries: - uid = e.get("user_id", "") - if uid and uid not in seen: - seen.add(uid) - users.append({ - "user_id": uid, - "xsec_token": e.get("xsec_token", ""), - "nickname": e.get("author", "未知"), - }) - - _cached_user_entries = users - - if not users: - return gr.update(choices=[], value=None), "⚠️ 未找到用户信息" - - choices = [ - f"@{u['nickname']} ({u['user_id'][:8]}...)" - for u in users - ] - return ( - gr.update(choices=choices, value=choices[0]), - f"✅ 发现 {len(users)} 位用户,请在下拉框中选择", - ) - except Exception as e: - _cached_user_entries = [] - return gr.update(choices=[], value=None), f"❌ {e}" + return json.loads(text) + except (json.JSONDecodeError, TypeError): + pass + # 可能包含 Markdown 代码块 + m = re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', text) + if m: + try: + return json.loads(m.group(1)) + except (json.JSONDecodeError, TypeError): + pass + return None -def on_user_selected(selected_user): - """用户下拉框选择回调, 自动填充 user_id 和 xsec_token""" - global _cached_user_entries - if not selected_user or not _cached_user_entries: - return gr.update(), gr.update() - - # 匹配 "(user_id_prefix...)" - for u in _cached_user_entries: - if u["user_id"][:8] in selected_user: - return u["user_id"], u["xsec_token"] - - return gr.update(), gr.update() +def _parse_count(val) -> float: + """解析数字字符串, 支持 '1.2万' 格式""" + if isinstance(val, (int, float)): + return float(val) + s = str(val).strip() + if "万" in s: + try: + return float(s.replace("万", "")) * 10000 + except ValueError: + pass + try: + return float(s) + except ValueError: + return 0.0 -def fetch_user_data(user_id, xsec_token, mcp_url): - """获取用户主页数据""" +def fetch_my_profile(user_id, xsec_token, mcp_url): + """获取我的账号数据, 返回结构化信息 + 可视化图表""" if not user_id or not xsec_token: - return "❌ 请填写用户 ID 和 xsec_token", "" + return "❌ 请填写你的用户 ID 和 xsec_token", "", None, None, None try: - client = MCPClient(mcp_url) + client = get_mcp_client(mcp_url) result = client.get_user_profile(user_id, xsec_token) if "error" in result: - return f"❌ 获取失败: {result['error']}", "" - return "✅ 数据加载完成", result.get("text", "无数据") - except Exception as e: - return f"❌ 获取数据失败: {e}", "" + return f"❌ {result['error']}", "", None, None, None + raw = result.get("raw", {}) + text = result.get("text", "") + + # 尝试从 raw 或 text 解析 JSON + data = None + if raw and isinstance(raw, dict): + content_list = raw.get("content", []) + for item in content_list: + if item.get("type") == "text": + data = _parse_profile_json(item.get("text", "")) + if data: + break + if not data: + data = _parse_profile_json(text) + if not data: + return "✅ 数据加载完成 (纯文本)", text, None, None, None + + # ---- 提取基本信息 (注意 MCP 对新号可能返回 null) ---- + basic = data.get("userBasicInfo") or {} + interactions = data.get("interactions") or [] + feeds = data.get("feeds") or [] + + gender_map = {0: "未知", 1: "男", 2: "女"} + info_lines = [ + f"## 👤 {basic.get('nickname', '未知')}", + f"- **小红书号**: {basic.get('redId', '-')}", + f"- **性别**: {gender_map.get(basic.get('gender', 0), '未知')}", + f"- **IP 属地**: {basic.get('ipLocation', '-')}", + f"- **简介**: {basic.get('desc', '-')}", + "", + "### 📊 核心数据", + ] + for inter in interactions: + info_lines.append(f"- **{inter.get('name', '')}**: {inter.get('count', '0')}") + + info_lines.append(f"\n### 📝 展示笔记: {len(feeds)} 篇") + profile_md = "\n".join(info_lines) + + # ---- 互动数据柱状图 ---- + fig_interact = None + if interactions: + inter_data = {i["name"]: _parse_count(i["count"]) for i in interactions} + fig_interact, ax = plt.subplots(figsize=(4, 3), dpi=100) + labels = list(inter_data.keys()) + values = list(inter_data.values()) + colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(labels)] + ax.bar(labels, values, color=colors, edgecolor="white", linewidth=0.5) + ax.set_title("账号核心指标", fontsize=12, fontweight="bold") + for i, v in enumerate(values): + display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) + ax.text(i, v + max(values) * 0.02, display, ha="center", fontsize=9) + ax.set_ylabel("") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + fig_interact.tight_layout() + + # ---- 笔记点赞分布图 ---- + fig_notes = None + if feeds: + titles, likes = [], [] + for f in feeds[:15]: + nc = f.get("noteCard") or {} + t = (nc.get("displayTitle", "") or "无标题")[:12] + lk = _parse_count((nc.get("interactInfo") or {}).get("likedCount", "0")) + titles.append(t) + likes.append(lk) + + fig_notes, ax2 = plt.subplots(figsize=(7, 3.5), dpi=100) + ax2.barh(range(len(titles)), likes, color="#FF6B6B", edgecolor="white") + ax2.set_yticks(range(len(titles))) + ax2.set_yticklabels(titles, fontsize=8) + ax2.set_title(f"笔记点赞排行 (Top {len(titles)})", fontsize=12, fontweight="bold") + ax2.invert_yaxis() + for i, v in enumerate(likes): + display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) + ax2.text(v + max(likes) * 0.01 if max(likes) > 0 else 0, i, display, va="center", fontsize=8) + ax2.spines["top"].set_visible(False) + ax2.spines["right"].set_visible(False) + fig_notes.tight_layout() + + # ---- 笔记详情表格 (Markdown) ---- + table_lines = [ + "### 📋 笔记数据明细", + "| # | 标题 | 类型 | ❤ 点赞 |", + "|---|------|------|--------|", + ] + for i, f in enumerate(feeds): + nc = f.get("noteCard") or {} + t = (nc.get("displayTitle", "") or "无标题")[:25] + tp = "📹 视频" if nc.get("type") == "video" else "📷 图文" + lk = (nc.get("interactInfo") or {}).get("likedCount", "0") + table_lines.append(f"| {i+1} | {t} | {tp} | {lk} |") + notes_table = "\n".join(table_lines) + + return "✅ 数据加载完成", profile_md, fig_interact, fig_notes, notes_table -def fetch_homepage_feeds(mcp_url): - """获取首页推荐""" - try: - client = MCPClient(mcp_url) - result = client.list_feeds() - if "error" in result: - return f"❌ {result['error']}", "" - return "✅ 推荐列表已加载", result.get("text", "无数据") except Exception as e: - return f"❌ {e}", "" + logger.error(f"获取我的数据失败: {e}") + return f"❌ {e}", "", None, None, None # ================================================== @@ -624,18 +907,46 @@ with gr.Blocks( # ============ 全局设置栏 ============ with gr.Accordion("⚙️ 全局设置 (自动保存)", open=False): + gr.Markdown("#### 🤖 LLM 提供商 (支持所有 OpenAI 兼容接口)") with gr.Row(): - api_key = gr.Textbox( - label="LLM API Key", value=config["api_key"], - type="password", scale=2, + llm_provider = gr.Dropdown( + label="选择 LLM 提供商", + choices=cfg.get_llm_provider_names(), + value=cfg.get("active_llm", ""), + interactive=True, scale=2, ) - base_url = gr.Textbox( - label="LLM Base URL", value=config["base_url"], scale=2, + btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm", scale=1) + with gr.Row(): + llm_model = gr.Dropdown( + label="LLM 模型", value=config["model"], + allow_custom_value=True, interactive=True, scale=2, ) + llm_provider_info = gr.Markdown( + value="*选择提供商后显示详情*", + ) + with gr.Accordion("➕ 添加 / 管理 LLM 提供商", open=False): + with gr.Row(): + new_provider_name = gr.Textbox( + label="名称", placeholder="如: DeepSeek / GPT-4o / 通义千问", + scale=1, + ) + new_provider_key = gr.Textbox( + label="API Key", type="password", scale=2, + ) + new_provider_url = gr.Textbox( + label="Base URL", placeholder="https://api.openai.com/v1", + value="https://api.openai.com/v1", scale=2, + ) + with gr.Row(): + btn_add_provider = gr.Button("✅ 添加提供商", variant="primary", size="sm") + btn_del_provider = gr.Button("🗑️ 删除当前提供商", variant="stop", size="sm") + provider_mgmt_status = gr.Markdown("") + + gr.Markdown("---") + with gr.Row(): mcp_url = gr.Textbox( label="MCP Server URL", value=config["mcp_url"], scale=2, ) - with gr.Row(): sd_url = gr.Textbox( label="SD WebUI URL", value=config["sd_url"], scale=2, ) @@ -644,14 +955,9 @@ with gr.Blocks( value=config["persona"], scale=3, ) with gr.Row(): - btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm") btn_connect_sd = gr.Button("🎨 连接 SD", size="sm") btn_check_mcp = gr.Button("📡 检查 MCP", size="sm") with gr.Row(): - llm_model = gr.Dropdown( - label="LLM 模型", value=config["model"], - allow_custom_value=True, interactive=True, scale=2, - ) sd_model = gr.Dropdown( label="SD 模型", allow_custom_value=True, interactive=True, scale=2, @@ -837,13 +1143,9 @@ with gr.Blocks( "粘贴要回复的评论 → AI 生成回复 → 一键发送" ) - # 笔记选择器 + # 笔记选择器 (自动用已保存的 userId 获取) with gr.Row(): - my_keyword = gr.Textbox( - label="🔍 搜索我的笔记关键词 (留空获取推荐)", - placeholder="我发布过的笔记关键词…", - ) - btn_my_fetch = gr.Button("🔍 获取笔记", variant="primary") + btn_my_fetch = gr.Button("🔍 获取我的笔记", variant="primary") with gr.Row(): my_selector = gr.Dropdown( label="📋 选择我的笔记", @@ -889,26 +1191,57 @@ with gr.Blocks( with gr.Tab("🔐 账号登录"): gr.Markdown( "### 小红书账号登录\n" - "> 点击获取二维码 → 用小红书 App 扫码 → 确认登录 → 检查状态" + "> 扫码登录后自动获取 xsec_token,配合用户 ID 即可使用所有功能" ) with gr.Row(): with gr.Column(scale=1): gr.Markdown( "**操作步骤:**\n" "1. 确保 MCP 服务已启动\n" - "2. 点击「获取登录二维码」\n" - "3. 用小红书 App 扫码并确认\n" - "4. 点击「检查登录状态」验证\n\n" + "2. 点击「获取登录二维码」→ 用小红书 App 扫码\n" + "3. 点击「检查登录状态」→ 自动获取并保存 xsec_token\n" + "4. 首次使用请填写你的用户 ID 并点击保存\n\n" "⚠️ 登录后不要在其他网页端登录同一账号,否则会被踢出" ) btn_get_qrcode = gr.Button( "📱 获取登录二维码", variant="primary", size="lg", ) btn_check_login = gr.Button( - "🔍 检查登录状态", variant="secondary", size="lg", + "🔍 检查登录状态 (自动获取 Token)", + variant="secondary", size="lg", + ) + btn_logout = gr.Button( + "🚪 退出登录 (重新扫码)", + variant="stop", size="lg", ) login_status = gr.Markdown("🔄 等待操作...") + gr.Markdown("---") + gr.Markdown( + "#### 📌 我的账号信息\n" + "> **注意**: 小红书号 ≠ 用户 ID\n" + "> - **小红书号 (redId)**: 如 `18688457507`,是你在 App 个人页看到的\n" + "> - **用户 ID (userId)**: 如 `5a695db6e8ac2b72e8af2a53`,24位十六进制字符串\n\n" + "💡 **如何获取 userId?**\n" + "1. 用浏览器打开你的小红书主页\n" + "2. 网址格式为: `xiaohongshu.com/user/profile/xxxxxxxx`\n" + "3. `profile/` 后面的就是你的 userId" + ) + login_user_id = gr.Textbox( + label="我的用户 ID (24位 userId, 非小红书号)", + value=config.get("my_user_id", ""), + placeholder="如: 5a695db6e8ac2b72e8af2a53", + ) + login_xsec_token = gr.Textbox( + label="xsec_token (登录后自动获取)", + value=config.get("xsec_token", ""), + interactive=False, + ) + btn_save_uid = gr.Button( + "💾 保存用户 ID", variant="secondary", + ) + save_uid_status = gr.Markdown("") + with gr.Column(scale=1): qr_image = gr.Image( label="扫码登录", height=350, width=350, @@ -916,50 +1249,75 @@ with gr.Blocks( # -------- Tab 5: 数据看板 -------- with gr.Tab("📊 数据看板"): - gr.Markdown("### 账号数据概览") + gr.Markdown( + "### 我的账号数据看板\n" + "> 用户 ID 和 xsec_token 从「账号登录」自动获取,直接点击加载即可" + ) - # ---- 用户选择器 ---- - gr.Markdown("#### 🔍 快速选择用户 (从推荐列表提取)") - with gr.Row(): - btn_fetch_users = gr.Button( - "👥 从推荐获取用户", variant="primary", - ) - user_selector = gr.Dropdown( - label="选择用户 (自动填充下方 ID)", - choices=[], interactive=True, - ) - user_fetch_status = gr.Markdown("") - - gr.Markdown("---") with gr.Row(): with gr.Column(scale=1): - data_user_id = gr.Textbox(label="用户 ID") - data_xsec_token = gr.Textbox(label="xsec_token") - btn_load_profile = gr.Button("📊 加载用户数据", variant="primary") + data_user_id = gr.Textbox( + label="我的用户 ID (自动填充)", + value=config.get("my_user_id", ""), + interactive=True, + ) + data_xsec_token = gr.Textbox( + label="xsec_token (自动填充)", + value=config.get("xsec_token", ""), + interactive=True, + ) + btn_refresh_token = gr.Button( + "🔄 刷新 Token", variant="secondary", + ) + btn_load_my_data = gr.Button( + "📊 加载我的数据", variant="primary", size="lg", + ) data_status = gr.Markdown("") - gr.Markdown("---") - btn_load_feeds = gr.Button("🏠 查看首页推荐", variant="secondary") - feeds_status = gr.Markdown("") - with gr.Column(scale=2): - profile_display = gr.TextArea( - label="用户信息 & 笔记数据", lines=15, - interactive=False, - ) - feeds_display = gr.TextArea( - label="首页推荐", lines=10, interactive=False, + profile_card = gr.Markdown( + value="*等待加载...*", + label="账号概览", ) + gr.Markdown("---") + gr.Markdown("### 📈 数据可视化") + with gr.Row(): + with gr.Column(scale=1): + chart_interact = gr.Plot(label="📊 核心指标") + with gr.Column(scale=2): + chart_notes = gr.Plot(label="❤ 笔记点赞排行") + + gr.Markdown("---") + notes_detail = gr.Markdown( + value="*加载数据后显示笔记明细表格*", + label="笔记数据明细", + ) + # ================================================== # 事件绑定 # ================================================== - # ---- 全局设置 ---- + # ---- 全局设置: LLM 提供商管理 ---- btn_connect_llm.click( - fn=connect_llm, inputs=[api_key, base_url], + fn=connect_llm, inputs=[llm_provider], outputs=[llm_model, status_bar], ) + llm_provider.change( + fn=on_provider_selected, + inputs=[llm_provider], + outputs=[llm_provider_info], + ) + btn_add_provider.click( + fn=add_llm_provider, + inputs=[new_provider_name, new_provider_key, new_provider_url], + outputs=[llm_provider, provider_mgmt_status], + ) + btn_del_provider.click( + fn=remove_llm_provider, + inputs=[llm_provider], + outputs=[llm_provider, provider_mgmt_status], + ) btn_connect_sd.click( fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar], @@ -972,7 +1330,7 @@ with gr.Blocks( # ---- Tab 1: 内容创作 ---- btn_gen_copy.click( fn=generate_copy, - inputs=[api_key, base_url, llm_model, topic, style], + inputs=[llm_model, topic, style], outputs=[res_title, res_content, res_prompt, res_tags, status_bar], ) @@ -1008,13 +1366,13 @@ with gr.Blocks( btn_analyze.click( fn=analyze_and_suggest, - inputs=[api_key, base_url, llm_model, hot_keyword, search_output], + inputs=[llm_model, hot_keyword, search_output], outputs=[analysis_status, analysis_output, topic_from_hot], ) btn_gen_from_hot.click( fn=generate_from_hotspot, - inputs=[api_key, base_url, llm_model, topic_from_hot, hot_style, search_output], + inputs=[llm_model, topic_from_hot, hot_style, search_output], outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status], ) @@ -1045,7 +1403,7 @@ with gr.Blocks( ) btn_pro_ai.click( fn=ai_generate_comment, - inputs=[api_key, base_url, llm_model, persona, + inputs=[llm_model, persona, pro_title, pro_content, pro_comments], outputs=[pro_comment_text, pro_ai_status], ) @@ -1058,7 +1416,7 @@ with gr.Blocks( # == 子 Tab B: 回复粉丝评论 == btn_my_fetch.click( fn=fetch_my_notes, - inputs=[my_keyword, mcp_url], + inputs=[mcp_url], outputs=[my_selector, my_fetch_status], ) my_selector.change( @@ -1073,7 +1431,7 @@ with gr.Blocks( ) btn_my_ai_reply.click( fn=ai_reply_comment, - inputs=[api_key, base_url, llm_model, persona, + inputs=[llm_model, persona, my_title, my_target_comment], outputs=[my_reply_content, my_reply_gen_status], ) @@ -1092,33 +1450,36 @@ with gr.Blocks( btn_check_login.click( fn=check_login, inputs=[mcp_url], + outputs=[login_status, login_user_id, login_xsec_token], + ) + btn_logout.click( + fn=logout_xhs, + inputs=[mcp_url], outputs=[login_status], ) + btn_save_uid.click( + fn=save_my_user_id, + inputs=[login_user_id], + outputs=[save_uid_status], + ) # ---- Tab 5: 数据看板 ---- - # 从推荐获取用户列表 - btn_fetch_users.click( - fn=fetch_user_list_from_feeds, - inputs=[mcp_url], - outputs=[user_selector, user_fetch_status], - ) - # 选择用户 -> 自动填充 user_id / xsec_token - user_selector.change( - fn=on_user_selected, - inputs=[user_selector], - outputs=[data_user_id, data_xsec_token], - ) + def refresh_xsec_token(mcp_url): + token = _auto_fetch_xsec_token(mcp_url) + if token: + cfg.set("xsec_token", token) + return gr.update(value=token), "✅ Token 已刷新" + return gr.update(value=cfg.get("xsec_token", "")), "❌ 刷新失败,请确认已登录" - btn_load_profile.click( - fn=fetch_user_data, + btn_refresh_token.click( + fn=refresh_xsec_token, + inputs=[mcp_url], + outputs=[data_xsec_token, data_status], + ) + btn_load_my_data.click( + fn=fetch_my_profile, inputs=[data_user_id, data_xsec_token, mcp_url], - outputs=[data_status, profile_display], - ) - - btn_load_feeds.click( - fn=fetch_homepage_feeds, - inputs=[mcp_url], - outputs=[feeds_status, feeds_display], + outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail], ) # ---- 启动时自动刷新 SD ---- diff --git a/mcp_client.py b/mcp_client.py index 13c1cd0..d5ee61b 100644 --- a/mcp_client.py +++ b/mcp_client.py @@ -16,6 +16,17 @@ logger = logging.getLogger(__name__) MCP_DEFAULT_URL = "http://localhost:18060/mcp" MCP_TIMEOUT = 60 # 秒 +# 全局客户端缓存 —— 同一 URL 复用同一实例,避免反复 initialize +_client_cache: dict[str, "MCPClient"] = {} + + +def get_mcp_client(base_url: str = MCP_DEFAULT_URL) -> "MCPClient": + """获取 MCP 客户端(单例),同一 URL 复用同一实例""" + if base_url not in _client_cache: + _client_cache[base_url] = MCPClient(base_url) + client = _client_cache[base_url] + return client + class MCPClient: """小红书 MCP 服务的 HTTP 客户端封装""" @@ -29,14 +40,22 @@ class MCPClient: # ---------- 底层通信 ---------- - def _call(self, method: str, params: dict = None) -> dict: - """发送 JSON-RPC 请求到 MCP 服务""" + def _call(self, method: str, params: dict = None, *, + is_notification: bool = False) -> dict: + """发送 JSON-RPC 请求到 MCP 服务 + + Args: + is_notification: 若为 True 则不带 id(JSON-RPC 通知) + """ payload = { "jsonrpc": "2.0", "method": method, "params": params or {}, - "id": str(uuid.uuid4()), } + # JSON-RPC 通知不带 id + if not is_notification: + payload["id"] = str(uuid.uuid4()) + headers = {} if self._session_id: headers["mcp-session-id"] = self._session_id @@ -50,6 +69,11 @@ class MCPClient: self._session_id = resp.headers["mcp-session-id"] resp.raise_for_status() + + # 通知不一定有响应体 + if is_notification: + return {"status": "notified"} + data = resp.json() if "error" in data: logger.error("MCP error: %s", data["error"]) @@ -74,19 +98,38 @@ class MCPClient: "clientInfo": {"name": "xhs-autobot", "version": "2.0.0"} }) if "error" not in result: - # 发送 initialized 通知 - self._call("notifications/initialized", {}) + # 发送 initialized 通知(JSON-RPC 通知不带 id) + self._call("notifications/initialized", {}, + is_notification=True) self._initialized = True return result return {"status": "already_initialized"} + def _reset(self): + """重置初始化状态(下次调用会重新握手)""" + self._initialized = False + self._session_id = None + def _call_tool(self, tool_name: str, arguments: dict = None) -> dict: - """调用 MCP 工具""" + """调用 MCP 工具,400 错误时自动重试一次""" self._ensure_initialized() result = self._call("tools/call", { "name": tool_name, "arguments": arguments or {} }) + + # 如果返回 400 相关错误,重置并重试一次 + if isinstance(result, dict) and "error" in result: + err_msg = str(result["error"]) + if "400" in err_msg or "Bad Request" in err_msg: + logger.warning("MCP 400 错误,重置会话后重试: %s", tool_name) + self._reset() + self._ensure_initialized() + result = self._call("tools/call", { + "name": tool_name, + "arguments": arguments or {} + }) + # 提取文本和图片内容 if isinstance(result, dict) and "content" in result: texts = [] @@ -319,3 +362,9 @@ class MCPClient: "user_id": user_id, "xsec_token": xsec_token, }) + + # ---------- 登录管理 ---------- + + def delete_cookies(self) -> dict: + """删除 cookies,重置登录状态""" + return self._call_tool("delete_cookies", {}) diff --git a/requirements.txt b/requirements.txt index 2fc6975..d926e9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ gradio>=4.0.0 requests>=2.28.0 Pillow>=9.0.0 +matplotlib>=3.5.0