xhs_factory/mcp_client.py
zhoujie d782bb6781 feat(auto): 新增自动点赞和自动回复功能,完善项目文档
- 新增自动点赞功能【一键点赞】:支持关键词搜索笔记并随机批量点赞,提升账号活跃度
- 新增自动回复功能【一键回复】:自动扫描用户笔记的粉丝评论,使用AI生成并发送回复
- 扩展自动化调度器【定时调度】:支持点赞和回复任务的随机定时执行,模拟真人操作间隔
- 新增项目文档【文档】:添加README、CHANGELOG、CONTRIBUTING、LICENSE等核心文档文件
- 优化.gitignore文件【配置】:完善Python项目、IDE、敏感文件、日志等忽略规则
- 新增配置文件模板【配置】:提供config.example.json作为配置参考
- 优化MCP客户端【工具】:新增评论解析方法,支持从笔记详情中提取结构化评论数据
2026-02-08 22:40:16 +08:00

426 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 MCP HTTP 客户端
封装对 xiaohongshu-mcp 服务 (http://localhost:18060/mcp) 的调用
"""
import requests
import json
import logging
import uuid
import base64
import re
import io
from PIL import Image
logger = logging.getLogger(__name__)
MCP_DEFAULT_URL = "http://localhost:18060/mcp"
MCP_TIMEOUT = 60 # 秒
# 全局客户端缓存 —— 同一 URL 复用同一实例,避免反复 initialize
_client_cache: dict[str, "MCPClient"] = {}
def get_mcp_client(base_url: str = MCP_DEFAULT_URL) -> "MCPClient":
"""获取 MCP 客户端(单例),同一 URL 复用同一实例"""
if base_url not in _client_cache:
_client_cache[base_url] = MCPClient(base_url)
client = _client_cache[base_url]
return client
class MCPClient:
"""小红书 MCP 服务的 HTTP 客户端封装"""
def __init__(self, base_url: str = MCP_DEFAULT_URL):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
self._session_id = None
self._initialized = False
# ---------- 底层通信 ----------
def _call(self, method: str, params: dict = None, *,
is_notification: bool = False) -> dict:
"""发送 JSON-RPC 请求到 MCP 服务
Args:
is_notification: 若为 True 则不带 idJSON-RPC 通知)
"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
# JSON-RPC 通知不带 id
if not is_notification:
payload["id"] = str(uuid.uuid4())
headers = {}
if self._session_id:
headers["mcp-session-id"] = self._session_id
try:
resp = self.session.post(
self.base_url, json=payload, timeout=MCP_TIMEOUT, headers=headers
)
# 保存 session id
if "mcp-session-id" in resp.headers:
self._session_id = resp.headers["mcp-session-id"]
resp.raise_for_status()
# 通知不一定有响应体
if is_notification:
return {"status": "notified"}
data = resp.json()
if "error" in data:
logger.error("MCP error: %s", data["error"])
return {"error": data["error"]}
return data.get("result", data)
except requests.exceptions.ConnectionError:
logger.error("MCP 服务未启动或无法连接: %s", self.base_url)
return {"error": "MCP 服务未启动,请先启动 xiaohongshu-mcp"}
except requests.exceptions.Timeout:
logger.error("MCP 请求超时")
return {"error": "MCP 请求超时,请稍后重试"}
except Exception as e:
logger.error("MCP 调用异常: %s", e)
return {"error": str(e)}
def _ensure_initialized(self):
"""确保 MCP 连接已初始化"""
if not self._initialized:
result = self._call("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "xhs-autobot", "version": "2.0.0"}
})
if "error" not in result:
# 发送 initialized 通知JSON-RPC 通知不带 id
self._call("notifications/initialized", {},
is_notification=True)
self._initialized = True
return result
return {"status": "already_initialized"}
def _reset(self):
"""重置初始化状态(下次调用会重新握手)"""
self._initialized = False
self._session_id = None
def _call_tool(self, tool_name: str, arguments: dict = None) -> dict:
"""调用 MCP 工具400 错误时自动重试一次"""
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 如果返回 400 相关错误,重置并重试一次
if isinstance(result, dict) and "error" in result:
err_msg = str(result["error"])
if "400" in err_msg or "Bad Request" in err_msg:
logger.warning("MCP 400 错误,重置会话后重试: %s", tool_name)
self._reset()
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 提取文本和图片内容
if isinstance(result, dict) and "content" in result:
texts = []
images = []
for item in result["content"]:
if item.get("type") == "text":
texts.append(item["text"])
elif item.get("type") == "image":
# MCP 返回的 base64 图片
img_data = item.get("data", "")
if img_data:
images.append(img_data)
out = {"success": True, "text": "\n".join(texts), "raw": result}
if images:
out["images"] = images
return out
return result
# ---------- 登录 ----------
def get_login_qrcode(self) -> dict:
"""获取登录二维码,返回 {success, text, qr_image(PIL.Image)}"""
result = self._call_tool("get_login_qrcode")
if "error" in result:
return result
# 尝试解析 base64 图片
qr_image = None
if "images" in result and result["images"]:
try:
img_bytes = base64.b64decode(result["images"][0])
qr_image = Image.open(io.BytesIO(img_bytes))
except Exception as e:
logger.warning("二维码图片解析失败: %s", e)
result["qr_image"] = qr_image
return result
def check_login_status(self) -> dict:
"""检查小红书登录状态"""
return self._call_tool("check_login_status")
# ---------- 连接状态 ----------
def check_connection(self) -> tuple[bool, str]:
"""检查 MCP 服务是否可连接"""
result = self._call_tool("check_login_status")
if "error" in result:
return False, result["error"]
return True, result.get("text", "已连接")
# ---------- 搜索 ----------
def search_feeds(self, keyword: str, sort_by: str = "综合",
note_type: str = "不限", publish_time: str = "不限") -> dict:
"""搜索小红书内容"""
args = {
"keyword": keyword,
"filters": {
"sort_by": sort_by,
"note_type": note_type,
"publish_time": publish_time,
}
}
return self._call_tool("search_feeds", args)
# ---------- 推荐列表 ----------
def list_feeds(self) -> dict:
"""获取首页推荐列表"""
return self._call_tool("list_feeds")
# ---------- 笔记列表解析 ----------
@staticmethod
def _parse_feed_entries(text: str) -> list[dict]:
"""从 MCP 返回文本中解析笔记条目为结构化列表"""
entries = []
# 方式1: 尝试直接 JSON 解析
try:
data = json.loads(text)
feeds = []
if isinstance(data, dict) and "feeds" in data:
feeds = data["feeds"]
elif isinstance(data, list):
feeds = data
for feed in feeds:
note = feed.get("noteCard", {})
user = note.get("user", {})
interact = note.get("interactInfo", {})
entries.append({
"feed_id": feed.get("id", ""),
"xsec_token": feed.get("xsecToken", ""),
"title": note.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": note.get("type", ""),
})
if entries:
return entries
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 适配 MCP 的文本格式
# 匹配 feed_id (24位十六进制)
feed_ids = re.findall(r'(?:feed_id|id)["\s:]+([0-9a-f]{24})', text, re.I)
# 匹配 xsecToken
tokens = re.findall(r'(?:xsec_?[Tt]oken)["\s:]+([A-Za-z0-9+/=_-]{20,})', text, re.I)
# 匹配标题
titles = re.findall(r'(?:title|标题)["\s:]+(.+?)(?:\n|$)', text, re.I)
# 匹配 userId
user_ids = re.findall(r'(?:user_?[Ii]d|userId)["\s:]+([0-9a-f]{24})', text, re.I)
count = max(len(feed_ids), len(tokens))
for i in range(count):
entries.append({
"feed_id": feed_ids[i] if i < len(feed_ids) else "",
"xsec_token": tokens[i] if i < len(tokens) else "",
"title": titles[i].strip() if i < len(titles) else f"笔记 {i+1}",
"author": "",
"user_id": user_ids[i] if i < len(user_ids) else "",
"likes": "",
"type": "",
})
return entries
def list_feeds_parsed(self) -> list[dict]:
"""获取首页推荐并解析为结构化列表"""
result = self.list_feeds()
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
def search_feeds_parsed(self, keyword: str, sort_by: str = "综合") -> list[dict]:
"""搜索笔记并解析为结构化列表"""
result = self.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
@staticmethod
def _parse_comments(text: str) -> list[dict]:
"""从笔记详情文本中解析评论列表为结构化数据
返回: [{comment_id, user_id, nickname, content, sub_comment_count}, ...]
"""
comments = []
# 方式1: 尝试 JSON 解析
try:
data = json.loads(text)
raw_comments = []
if isinstance(data, dict):
raw_comments = data.get("comments", [])
elif isinstance(data, list):
raw_comments = data
for c in raw_comments:
user_info = c.get("userInfo") or c.get("user") or {}
comments.append({
"comment_id": c.get("id", c.get("commentId", "")),
"user_id": user_info.get("userId", user_info.get("user_id", "")),
"nickname": user_info.get("nickname", user_info.get("nickName", "未知")),
"content": c.get("content", ""),
"sub_comment_count": c.get("subCommentCount", 0),
})
if comments:
return comments
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 适配多种 MCP 文本格式
# 格式举例: "评论ID: xxx | 用户: xxx (userId) | 内容: xxx"
# 或者: 用户名(@nickname): 评论内容
comment_ids = re.findall(
r'(?:comment_?[Ii]d|评论ID|评论id|"id")["\s:]+([0-9a-f]{24})', text, re.I)
user_ids = re.findall(
r'(?:user_?[Ii]d|userId|用户ID)["\s:]+([0-9a-f]{24})', text, re.I)
nicknames = re.findall(
r'(?:nickname|昵称|用户名|用户)["\s:]+([^\n|,]{1,30})', text, re.I)
contents = re.findall(
r'(?:content|内容|评论内容)["\s:]+([^\n]{1,500})', text, re.I)
count = max(len(comment_ids), len(contents))
for i in range(count):
comments.append({
"comment_id": comment_ids[i] if i < len(comment_ids) else "",
"user_id": user_ids[i] if i < len(user_ids) else "",
"nickname": (nicknames[i].strip() if i < len(nicknames) else ""),
"content": (contents[i].strip() if i < len(contents) else ""),
"sub_comment_count": 0,
})
return comments
# ---------- 帖子详情 ----------
def get_feed_detail(self, feed_id: str, xsec_token: str,
load_all_comments: bool = False) -> dict:
"""获取笔记详情"""
args = {
"feed_id": feed_id,
"xsec_token": xsec_token,
"load_all_comments": load_all_comments,
}
return self._call_tool("get_feed_detail", args)
# ---------- 发布 ----------
def publish_content(self, title: str, content: str, images: list[str],
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布图文内容"""
args = {
"title": title,
"content": content,
"images": images,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_content", args)
def publish_video(self, title: str, content: str, video_path: str,
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布视频内容"""
args = {
"title": title,
"content": content,
"video": video_path,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_with_video", args)
# ---------- 评论 ----------
def post_comment(self, feed_id: str, xsec_token: str, comment: str) -> dict:
"""发表评论"""
return self._call_tool("post_comment_to_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"content": comment,
})
def reply_comment(self, feed_id: str, xsec_token: str,
comment_id: str, user_id: str, content: str) -> dict:
"""回复评论"""
return self._call_tool("reply_comment_in_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"comment_id": comment_id,
"user_id": user_id,
"content": content,
})
# ---------- 互动 ----------
def like_feed(self, feed_id: str, xsec_token: str, unlike: bool = False) -> dict:
"""点赞/取消点赞"""
return self._call_tool("like_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unlike": unlike,
})
def favorite_feed(self, feed_id: str, xsec_token: str,
unfavorite: bool = False) -> dict:
"""收藏/取消收藏"""
return self._call_tool("favorite_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unfavorite": unfavorite,
})
# ---------- 用户 ----------
def get_user_profile(self, user_id: str, xsec_token: str) -> dict:
"""获取用户主页信息"""
return self._call_tool("user_profile", {
"user_id": user_id,
"xsec_token": xsec_token,
})
# ---------- 登录管理 ----------
def delete_cookies(self) -> dict:
"""删除 cookies重置登录状态"""
return self._call_tool("delete_cookies", {})