xhs_factory/services/mcp_client.py
zhoujie 2ba87c8f6e
Some checks failed
CI / Lint (ruff) (push) Has been cancelled
CI / Import Check (push) Has been cancelled
📝 docs(project): 添加开源社区标准文档与 CI 工作流
- 新增 GitHub Issue 模板(Bug 报告、功能请求)和 Pull Request 模板
- 新增 Code of Conduct(贡献者行为准则)和 Security Policy(安全政策)
- 新增 CI 工作流(GitHub Actions),包含 ruff 代码检查和导入验证
- 新增开发依赖文件 requirements-dev.txt

📦 build(ci): 配置 GitHub Actions 持续集成

- 在 push 到 main 分支和 pull request 时自动触发 CI
- 添加 lint 任务执行 ruff 代码风格检查
- 添加 import-check 任务验证核心服务模块导入

♻️ refactor(structure): 重构项目目录结构

- 将根目录的 6 个服务模块迁移至 services/ 包
- 更新所有相关文件的导入语句(main.py、ui/、services/)
- 根目录仅保留 main.py 作为唯一 Python 入口文件

🔧 chore(config): 调整配置和资源文件路径

- 将 config.json 移至 config/ 目录,更新相关引用
- 将个人头像图片移至 assets/faces/ 目录,更新 .gitignore
- 更新 Dockerfile 和 docker-compose.yml 中的配置路径

📝 docs(readme): 完善 README 文档

- 添加项目状态徽章(Python 版本、License、CI)
- 更新项目结构图反映实际目录布局
- 修正使用指南中的 Tab 名称和操作路径
- 替换 your-username 占位符为格式提示

🗑️ chore(cleanup): 清理冗余文件

- 删除旧版备份文件、测试脚本、临时记录和运行日志
- 删除散落的个人图片文件(已归档至 assets/faces/)
2026-02-27 22:12:39 +08:00

482 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 MCP HTTP 客户端
封装对 xiaohongshu-mcp 服务 (http://localhost:18060/mcp) 的调用
"""
import requests
import json
import logging
import uuid
import base64
import re
import io
from PIL import Image
logger = logging.getLogger(__name__)
MCP_DEFAULT_URL = "http://localhost:18060/mcp"
MCP_TIMEOUT = 60 # 秒
# 全局客户端缓存 —— 同一 URL 复用同一实例,避免反复 initialize
_client_cache: dict[str, "MCPClient"] = {}
def get_mcp_client(base_url: str = MCP_DEFAULT_URL) -> "MCPClient":
"""获取 MCP 客户端(单例),同一 URL 复用同一实例"""
if base_url not in _client_cache:
_client_cache[base_url] = MCPClient(base_url)
client = _client_cache[base_url]
return client
class MCPClient:
"""小红书 MCP 服务的 HTTP 客户端封装"""
def __init__(self, base_url: str = MCP_DEFAULT_URL):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
self._session_id = None
self._initialized = False
# ---------- 底层通信 ----------
def _call(self, method: str, params: dict = None, *,
is_notification: bool = False) -> dict:
"""发送 JSON-RPC 请求到 MCP 服务
Args:
is_notification: 若为 True 则不带 idJSON-RPC 通知)
"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
# JSON-RPC 通知不带 id
if not is_notification:
payload["id"] = str(uuid.uuid4())
headers = {}
if self._session_id:
headers["mcp-session-id"] = self._session_id
try:
resp = self.session.post(
self.base_url, json=payload, timeout=MCP_TIMEOUT, headers=headers
)
# 保存 session id
if "mcp-session-id" in resp.headers:
self._session_id = resp.headers["mcp-session-id"]
resp.raise_for_status()
# 通知不一定有响应体
if is_notification:
return {"status": "notified"}
data = resp.json()
if "error" in data:
logger.error("MCP error: %s", data["error"])
return {"error": data["error"]}
return data.get("result", data)
except requests.exceptions.ConnectionError:
logger.error("MCP 服务未启动或无法连接: %s", self.base_url)
return {"error": "MCP 服务未启动,请先启动 xiaohongshu-mcp"}
except requests.exceptions.Timeout:
logger.error("MCP 请求超时")
return {"error": "MCP 请求超时,请稍后重试"}
except Exception as e:
logger.error("MCP 调用异常: %s", e)
return {"error": str(e)}
def _ensure_initialized(self):
"""确保 MCP 连接已初始化"""
if not self._initialized:
result = self._call("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "xhs-autobot", "version": "2.0.0"}
})
if "error" not in result:
# 发送 initialized 通知JSON-RPC 通知不带 id
self._call("notifications/initialized", {},
is_notification=True)
self._initialized = True
return result
return {"status": "already_initialized"}
def _reset(self):
"""重置初始化状态(下次调用会重新握手)"""
self._initialized = False
self._session_id = None
def _call_tool(self, tool_name: str, arguments: dict = None) -> dict:
"""调用 MCP 工具400 错误时自动重试一次"""
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 如果返回 400 相关错误,重置并重试一次
if isinstance(result, dict) and "error" in result:
err_msg = str(result["error"])
if "400" in err_msg or "Bad Request" in err_msg:
logger.warning("MCP 400 错误,重置会话后重试: %s", tool_name)
self._reset()
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 提取文本和图片内容
if isinstance(result, dict) and "content" in result:
texts = []
images = []
for item in result["content"]:
if item.get("type") == "text":
texts.append(item["text"])
elif item.get("type") == "image":
# MCP 返回的 base64 图片
img_data = item.get("data", "")
if img_data:
images.append(img_data)
out = {"success": True, "text": "\n".join(texts), "raw": result}
if images:
out["images"] = images
return out
return result
# ---------- 登录 ----------
def get_login_qrcode(self) -> dict:
"""获取登录二维码,返回 {success, text, qr_image(PIL.Image)}"""
result = self._call_tool("get_login_qrcode")
if "error" in result:
return result
# 尝试解析 base64 图片
qr_image = None
if "images" in result and result["images"]:
try:
img_bytes = base64.b64decode(result["images"][0])
qr_image = Image.open(io.BytesIO(img_bytes))
except Exception as e:
logger.warning("二维码图片解析失败: %s", e)
result["qr_image"] = qr_image
return result
def check_login_status(self) -> dict:
"""检查小红书登录状态"""
return self._call_tool("check_login_status")
# ---------- 连接状态 ----------
def check_connection(self) -> tuple[bool, str]:
"""检查 MCP 服务是否可连接"""
result = self._call_tool("check_login_status")
if "error" in result:
return False, result["error"]
return True, result.get("text", "已连接")
# ---------- 搜索 ----------
def search_feeds(self, keyword: str, sort_by: str = "综合",
note_type: str = "不限", publish_time: str = "不限") -> dict:
"""搜索小红书内容"""
args = {
"keyword": keyword,
"filters": {
"sort_by": sort_by,
"note_type": note_type,
"publish_time": publish_time,
}
}
return self._call_tool("search_feeds", args)
# ---------- 推荐列表 ----------
def list_feeds(self) -> dict:
"""获取首页推荐列表"""
return self._call_tool("list_feeds")
# ---------- 笔记列表解析 ----------
@staticmethod
def _parse_feed_entries(text: str) -> list[dict]:
"""从 MCP 返回文本中解析笔记条目为结构化列表"""
entries = []
# 方式1: 尝试直接 JSON 解析
try:
data = json.loads(text)
feeds = []
if isinstance(data, dict) and "feeds" in data:
feeds = data["feeds"]
elif isinstance(data, list):
feeds = data
for feed in feeds:
note = feed.get("noteCard", {})
user = note.get("user", {})
interact = note.get("interactInfo", {})
entries.append({
"feed_id": feed.get("id", ""),
"xsec_token": feed.get("xsecToken", ""),
"title": note.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": note.get("type", ""),
})
if entries:
return entries
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 适配 MCP 的文本格式
# 匹配 feed_id (24位十六进制)
feed_ids = re.findall(r'(?:feed_id|id)["\s:]+([0-9a-f]{24})', text, re.I)
# 匹配 xsecToken
tokens = re.findall(r'(?:xsec_?[Tt]oken)["\s:]+([A-Za-z0-9+/=_-]{20,})', text, re.I)
# 匹配标题
titles = re.findall(r'(?:title|标题)["\s:]+(.+?)(?:\n|$)', text, re.I)
# 匹配 userId
user_ids = re.findall(r'(?:user_?[Ii]d|userId)["\s:]+([0-9a-f]{24})', text, re.I)
count = max(len(feed_ids), len(tokens))
for i in range(count):
entries.append({
"feed_id": feed_ids[i] if i < len(feed_ids) else "",
"xsec_token": tokens[i] if i < len(tokens) else "",
"title": titles[i].strip() if i < len(titles) else f"笔记 {i+1}",
"author": "",
"user_id": user_ids[i] if i < len(user_ids) else "",
"likes": "",
"type": "",
})
return entries
def list_feeds_parsed(self) -> list[dict]:
"""获取首页推荐并解析为结构化列表"""
result = self.list_feeds()
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
def search_feeds_parsed(self, keyword: str, sort_by: str = "综合") -> list[dict]:
"""搜索笔记并解析为结构化列表"""
result = self.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
@staticmethod
def _extract_comment_obj(c: dict) -> dict:
"""从单个评论 JSON 对象提取结构化数据"""
user_info = c.get("userInfo") or c.get("user") or {}
return {
"comment_id": str(c.get("id", c.get("commentId", ""))),
"user_id": user_info.get("userId", user_info.get("user_id", "")),
"nickname": user_info.get("nickname", user_info.get("nickName", "未知")),
"content": c.get("content", ""),
"sub_comment_count": c.get("subCommentCount", 0),
}
@staticmethod
def _find_comment_list(data: dict) -> list:
"""在多种嵌套结构中定位评论列表"""
if not isinstance(data, dict):
return []
# 格式1: {"data": {"comments": {"list": [...]}}} —— 实际 MCP 返回
d = data.get("data", {})
if isinstance(d, dict):
cm = d.get("comments", {})
if isinstance(cm, dict) and "list" in cm:
return cm["list"]
if isinstance(cm, list):
return cm
# 格式2: {"comments": {"list": [...]}}
cm = data.get("comments", {})
if isinstance(cm, dict) and "list" in cm:
return cm["list"]
if isinstance(cm, list):
return cm
# 格式3: {"data": [{...}, ...]} (直接列表)
if isinstance(d, list):
return d
return []
@classmethod
def _parse_comments(cls, text: str) -> list[dict]:
"""从笔记详情文本中解析评论列表为结构化数据
返回: [{comment_id, user_id, nickname, content, sub_comment_count}, ...]
"""
comments = []
# 方式1: 尝试 JSON 解析(支持多种嵌套格式)
try:
data = json.loads(text)
raw_comments = []
if isinstance(data, list):
raw_comments = data
elif isinstance(data, dict):
raw_comments = cls._find_comment_list(data)
for c in raw_comments:
if isinstance(c, dict) and c.get("content"):
comments.append(cls._extract_comment_obj(c))
if comments:
return comments
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 仅当 JSON 完全失败时使用
# 逐个评论块提取,避免跨评论字段错位
# 匹配 JSON 对象中相邻的 id + content + userInfo 组合
comment_blocks = re.finditer(
r'"id"\s*:\s*"([0-9a-fA-F]{20,26})"[^}]*?'
r'"content"\s*:\s*"([^"]{1,500})"[^}]*?'
r'"userInfo"\s*:\s*\{[^}]*?"userId"\s*:\s*"([0-9a-fA-F]{20,26})"'
r'[^}]*?"nickname"\s*:\s*"([^"]{1,30})"',
text, re.DOTALL
)
for m in comment_blocks:
comments.append({
"comment_id": m.group(1),
"user_id": m.group(3),
"nickname": m.group(4),
"content": m.group(2),
"sub_comment_count": 0,
})
return comments
# ---------- 帖子详情 ----------
def get_feed_detail(self, feed_id: str, xsec_token: str,
load_all_comments: bool = False) -> dict:
"""获取笔记详情"""
args = {
"feed_id": feed_id,
"xsec_token": xsec_token,
"load_all_comments": load_all_comments,
}
return self._call_tool("get_feed_detail", args)
def get_feed_comments(self, feed_id: str, xsec_token: str,
load_all: bool = True) -> list[dict]:
"""获取笔记评论列表(结构化)
直接返回解析好的评论列表,优先从 raw JSON 解析
"""
result = self.get_feed_detail(feed_id, xsec_token, load_all_comments=load_all)
if "error" in result:
return []
# 优先从 raw 结构中直接提取
raw = result.get("raw", {})
if raw and isinstance(raw, dict):
for item in raw.get("content", []):
if item.get("type") == "text":
try:
data = json.loads(item["text"])
comment_list = self._find_comment_list(data)
if comment_list:
return [self._extract_comment_obj(c)
for c in comment_list
if isinstance(c, dict) and c.get("content")]
except (json.JSONDecodeError, KeyError, TypeError):
pass
# 回退到 text 解析
text = result.get("text", "")
return self._parse_comments(text) if text else []
# ---------- 发布 ----------
def publish_content(self, title: str, content: str, images: list[str],
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布图文内容"""
args = {
"title": title,
"content": content,
"images": images,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_content", args)
def publish_video(self, title: str, content: str, video_path: str,
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布视频内容"""
args = {
"title": title,
"content": content,
"video": video_path,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_with_video", args)
# ---------- 评论 ----------
def post_comment(self, feed_id: str, xsec_token: str, comment: str) -> dict:
"""发表评论"""
return self._call_tool("post_comment_to_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"content": comment,
})
def reply_comment(self, feed_id: str, xsec_token: str,
comment_id: str, user_id: str, content: str) -> dict:
"""回复评论"""
return self._call_tool("reply_comment_in_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"comment_id": comment_id,
"user_id": user_id,
"content": content,
})
# ---------- 互动 ----------
def like_feed(self, feed_id: str, xsec_token: str, unlike: bool = False) -> dict:
"""点赞/取消点赞"""
return self._call_tool("like_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unlike": unlike,
})
def favorite_feed(self, feed_id: str, xsec_token: str,
unfavorite: bool = False) -> dict:
"""收藏/取消收藏"""
return self._call_tool("favorite_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unfavorite": unfavorite,
})
# ---------- 用户 ----------
def get_user_profile(self, user_id: str, xsec_token: str) -> dict:
"""获取用户主页信息"""
return self._call_tool("user_profile", {
"user_id": user_id,
"xsec_token": xsec_token,
})
# ---------- 登录管理 ----------
def delete_cookies(self) -> dict:
"""删除 cookies重置登录状态"""
return self._call_tool("delete_cookies", {})