xhs_factory/mcp_client.py
zhoujie 88dfc09e2a feat(config): 新增多 LLM 提供商支持与账号数据看板
- 新增多 LLM 提供商管理功能,支持添加、删除和切换不同 API 提供商
- 新增账号数据看板,支持可视化展示用户核心指标和笔记点赞排行
- 新增自动获取并保存 xsec_token 功能,提升登录体验
- 新增退出登录功能,支持重新扫码登录
- 新增用户 ID 验证和保存功能,确保账号信息准确性

♻️ refactor(config): 重构配置管理和 LLM 服务调用

- 重构配置管理器,支持多 LLM 提供商配置和兼容旧配置自动迁移
- 重构 LLM 服务调用逻辑,统一从配置管理器获取激活的提供商信息
- 重构 MCP 客户端,增加单例模式和自动重试机制,提升连接稳定性
- 重构数据看板页面,优化用户数据获取和可视化展示逻辑

🐛 fix(mcp): 修复 MCP 连接和登录状态检查问题

- 修复 MCP 客户端初始化问题,避免重复握手
- 修复登录状态检查逻辑,自动获取并保存 xsec_token
- 修复获取我的笔记列表功能,支持通过用户 ID 准确获取
- 修复 JSON-RPC 通知格式问题,确保与 MCP 服务兼容

📝 docs(config): 更新配置文件和代码注释

- 更新配置文件结构,新增多 LLM 提供商配置字段
- 更新代码注释,明确各功能模块的作用和调用方式
- 更新用户界面提示信息,提供更清晰的操作指引
2026-02-08 21:52:29 +08:00

371 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 MCP HTTP 客户端
封装对 xiaohongshu-mcp 服务 (http://localhost:18060/mcp) 的调用
"""
import requests
import json
import logging
import uuid
import base64
import re
import io
from PIL import Image
logger = logging.getLogger(__name__)
MCP_DEFAULT_URL = "http://localhost:18060/mcp"
MCP_TIMEOUT = 60 # 秒
# 全局客户端缓存 —— 同一 URL 复用同一实例,避免反复 initialize
_client_cache: dict[str, "MCPClient"] = {}
def get_mcp_client(base_url: str = MCP_DEFAULT_URL) -> "MCPClient":
"""获取 MCP 客户端(单例),同一 URL 复用同一实例"""
if base_url not in _client_cache:
_client_cache[base_url] = MCPClient(base_url)
client = _client_cache[base_url]
return client
class MCPClient:
"""小红书 MCP 服务的 HTTP 客户端封装"""
def __init__(self, base_url: str = MCP_DEFAULT_URL):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
self._session_id = None
self._initialized = False
# ---------- 底层通信 ----------
def _call(self, method: str, params: dict = None, *,
is_notification: bool = False) -> dict:
"""发送 JSON-RPC 请求到 MCP 服务
Args:
is_notification: 若为 True 则不带 idJSON-RPC 通知)
"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
# JSON-RPC 通知不带 id
if not is_notification:
payload["id"] = str(uuid.uuid4())
headers = {}
if self._session_id:
headers["mcp-session-id"] = self._session_id
try:
resp = self.session.post(
self.base_url, json=payload, timeout=MCP_TIMEOUT, headers=headers
)
# 保存 session id
if "mcp-session-id" in resp.headers:
self._session_id = resp.headers["mcp-session-id"]
resp.raise_for_status()
# 通知不一定有响应体
if is_notification:
return {"status": "notified"}
data = resp.json()
if "error" in data:
logger.error("MCP error: %s", data["error"])
return {"error": data["error"]}
return data.get("result", data)
except requests.exceptions.ConnectionError:
logger.error("MCP 服务未启动或无法连接: %s", self.base_url)
return {"error": "MCP 服务未启动,请先启动 xiaohongshu-mcp"}
except requests.exceptions.Timeout:
logger.error("MCP 请求超时")
return {"error": "MCP 请求超时,请稍后重试"}
except Exception as e:
logger.error("MCP 调用异常: %s", e)
return {"error": str(e)}
def _ensure_initialized(self):
"""确保 MCP 连接已初始化"""
if not self._initialized:
result = self._call("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "xhs-autobot", "version": "2.0.0"}
})
if "error" not in result:
# 发送 initialized 通知JSON-RPC 通知不带 id
self._call("notifications/initialized", {},
is_notification=True)
self._initialized = True
return result
return {"status": "already_initialized"}
def _reset(self):
"""重置初始化状态(下次调用会重新握手)"""
self._initialized = False
self._session_id = None
def _call_tool(self, tool_name: str, arguments: dict = None) -> dict:
"""调用 MCP 工具400 错误时自动重试一次"""
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 如果返回 400 相关错误,重置并重试一次
if isinstance(result, dict) and "error" in result:
err_msg = str(result["error"])
if "400" in err_msg or "Bad Request" in err_msg:
logger.warning("MCP 400 错误,重置会话后重试: %s", tool_name)
self._reset()
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 提取文本和图片内容
if isinstance(result, dict) and "content" in result:
texts = []
images = []
for item in result["content"]:
if item.get("type") == "text":
texts.append(item["text"])
elif item.get("type") == "image":
# MCP 返回的 base64 图片
img_data = item.get("data", "")
if img_data:
images.append(img_data)
out = {"success": True, "text": "\n".join(texts), "raw": result}
if images:
out["images"] = images
return out
return result
# ---------- 登录 ----------
def get_login_qrcode(self) -> dict:
"""获取登录二维码,返回 {success, text, qr_image(PIL.Image)}"""
result = self._call_tool("get_login_qrcode")
if "error" in result:
return result
# 尝试解析 base64 图片
qr_image = None
if "images" in result and result["images"]:
try:
img_bytes = base64.b64decode(result["images"][0])
qr_image = Image.open(io.BytesIO(img_bytes))
except Exception as e:
logger.warning("二维码图片解析失败: %s", e)
result["qr_image"] = qr_image
return result
def check_login_status(self) -> dict:
"""检查小红书登录状态"""
return self._call_tool("check_login_status")
# ---------- 连接状态 ----------
def check_connection(self) -> tuple[bool, str]:
"""检查 MCP 服务是否可连接"""
result = self._call_tool("check_login_status")
if "error" in result:
return False, result["error"]
return True, result.get("text", "已连接")
# ---------- 搜索 ----------
def search_feeds(self, keyword: str, sort_by: str = "综合",
note_type: str = "不限", publish_time: str = "不限") -> dict:
"""搜索小红书内容"""
args = {
"keyword": keyword,
"filters": {
"sort_by": sort_by,
"note_type": note_type,
"publish_time": publish_time,
}
}
return self._call_tool("search_feeds", args)
# ---------- 推荐列表 ----------
def list_feeds(self) -> dict:
"""获取首页推荐列表"""
return self._call_tool("list_feeds")
# ---------- 笔记列表解析 ----------
@staticmethod
def _parse_feed_entries(text: str) -> list[dict]:
"""从 MCP 返回文本中解析笔记条目为结构化列表"""
entries = []
# 方式1: 尝试直接 JSON 解析
try:
data = json.loads(text)
feeds = []
if isinstance(data, dict) and "feeds" in data:
feeds = data["feeds"]
elif isinstance(data, list):
feeds = data
for feed in feeds:
note = feed.get("noteCard", {})
user = note.get("user", {})
interact = note.get("interactInfo", {})
entries.append({
"feed_id": feed.get("id", ""),
"xsec_token": feed.get("xsecToken", ""),
"title": note.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": note.get("type", ""),
})
if entries:
return entries
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 适配 MCP 的文本格式
# 匹配 feed_id (24位十六进制)
feed_ids = re.findall(r'(?:feed_id|id)["\s:]+([0-9a-f]{24})', text, re.I)
# 匹配 xsecToken
tokens = re.findall(r'(?:xsec_?[Tt]oken)["\s:]+([A-Za-z0-9+/=_-]{20,})', text, re.I)
# 匹配标题
titles = re.findall(r'(?:title|标题)["\s:]+(.+?)(?:\n|$)', text, re.I)
# 匹配 userId
user_ids = re.findall(r'(?:user_?[Ii]d|userId)["\s:]+([0-9a-f]{24})', text, re.I)
count = max(len(feed_ids), len(tokens))
for i in range(count):
entries.append({
"feed_id": feed_ids[i] if i < len(feed_ids) else "",
"xsec_token": tokens[i] if i < len(tokens) else "",
"title": titles[i].strip() if i < len(titles) else f"笔记 {i+1}",
"author": "",
"user_id": user_ids[i] if i < len(user_ids) else "",
"likes": "",
"type": "",
})
return entries
def list_feeds_parsed(self) -> list[dict]:
"""获取首页推荐并解析为结构化列表"""
result = self.list_feeds()
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
def search_feeds_parsed(self, keyword: str, sort_by: str = "综合") -> list[dict]:
"""搜索笔记并解析为结构化列表"""
result = self.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
# ---------- 帖子详情 ----------
def get_feed_detail(self, feed_id: str, xsec_token: str,
load_all_comments: bool = False) -> dict:
"""获取笔记详情"""
args = {
"feed_id": feed_id,
"xsec_token": xsec_token,
"load_all_comments": load_all_comments,
}
return self._call_tool("get_feed_detail", args)
# ---------- 发布 ----------
def publish_content(self, title: str, content: str, images: list[str],
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布图文内容"""
args = {
"title": title,
"content": content,
"images": images,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_content", args)
def publish_video(self, title: str, content: str, video_path: str,
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布视频内容"""
args = {
"title": title,
"content": content,
"video": video_path,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_with_video", args)
# ---------- 评论 ----------
def post_comment(self, feed_id: str, xsec_token: str, comment: str) -> dict:
"""发表评论"""
return self._call_tool("post_comment_to_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"content": comment,
})
def reply_comment(self, feed_id: str, xsec_token: str,
comment_id: str, user_id: str, content: str) -> dict:
"""回复评论"""
return self._call_tool("reply_comment_in_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"comment_id": comment_id,
"user_id": user_id,
"content": content,
})
# ---------- 互动 ----------
def like_feed(self, feed_id: str, xsec_token: str, unlike: bool = False) -> dict:
"""点赞/取消点赞"""
return self._call_tool("like_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unlike": unlike,
})
def favorite_feed(self, feed_id: str, xsec_token: str,
unfavorite: bool = False) -> dict:
"""收藏/取消收藏"""
return self._call_tool("favorite_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unfavorite": unfavorite,
})
# ---------- 用户 ----------
def get_user_profile(self, user_id: str, xsec_token: str) -> dict:
"""获取用户主页信息"""
return self._call_tool("user_profile", {
"user_id": user_id,
"xsec_token": xsec_token,
})
# ---------- 登录管理 ----------
def delete_cookies(self) -> dict:
"""删除 cookies重置登录状态"""
return self._call_tool("delete_cookies", {})