xhs_factory/mcp_client.py
zhoujie 88faca150d feat(project): 初始化小红书AI爆文工坊V2.0项目
- 新增项目配置文件(.gitignore, config.json)和核心文档(Todo.md, mcp.md)
- 实现配置管理模块(config_manager.py),支持单例模式和自动保存
- 实现LLM服务模块(llm_service.py),包含文案生成、热点分析、评论回复等Prompt模板
- 实现SD服务模块(sd_service.py),封装Stable Diffusion WebUI API调用
- 实现MCP客户端模块(mcp_client.py),封装小红书MCP服务HTTP调用
- 实现主程序(main.py),构建Gradio界面,包含内容创作、热点探测、评论管家、账号登录、数据看板五大功能模块
- 保留V1版本备份(main_v1_backup.py)供参考
- 添加项目依赖文件(requirements.txt)
2026-02-08 14:21:50 +08:00

322 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 MCP HTTP 客户端
封装对 xiaohongshu-mcp 服务 (http://localhost:18060/mcp) 的调用
"""
import requests
import json
import logging
import uuid
import base64
import re
import io
from PIL import Image
logger = logging.getLogger(__name__)
MCP_DEFAULT_URL = "http://localhost:18060/mcp"
MCP_TIMEOUT = 60 # 秒
class MCPClient:
"""小红书 MCP 服务的 HTTP 客户端封装"""
def __init__(self, base_url: str = MCP_DEFAULT_URL):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
self._session_id = None
self._initialized = False
# ---------- 底层通信 ----------
def _call(self, method: str, params: dict = None) -> dict:
"""发送 JSON-RPC 请求到 MCP 服务"""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": str(uuid.uuid4()),
}
headers = {}
if self._session_id:
headers["mcp-session-id"] = self._session_id
try:
resp = self.session.post(
self.base_url, json=payload, timeout=MCP_TIMEOUT, headers=headers
)
# 保存 session id
if "mcp-session-id" in resp.headers:
self._session_id = resp.headers["mcp-session-id"]
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.error("MCP error: %s", data["error"])
return {"error": data["error"]}
return data.get("result", data)
except requests.exceptions.ConnectionError:
logger.error("MCP 服务未启动或无法连接: %s", self.base_url)
return {"error": "MCP 服务未启动,请先启动 xiaohongshu-mcp"}
except requests.exceptions.Timeout:
logger.error("MCP 请求超时")
return {"error": "MCP 请求超时,请稍后重试"}
except Exception as e:
logger.error("MCP 调用异常: %s", e)
return {"error": str(e)}
def _ensure_initialized(self):
"""确保 MCP 连接已初始化"""
if not self._initialized:
result = self._call("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "xhs-autobot", "version": "2.0.0"}
})
if "error" not in result:
# 发送 initialized 通知
self._call("notifications/initialized", {})
self._initialized = True
return result
return {"status": "already_initialized"}
def _call_tool(self, tool_name: str, arguments: dict = None) -> dict:
"""调用 MCP 工具"""
self._ensure_initialized()
result = self._call("tools/call", {
"name": tool_name,
"arguments": arguments or {}
})
# 提取文本和图片内容
if isinstance(result, dict) and "content" in result:
texts = []
images = []
for item in result["content"]:
if item.get("type") == "text":
texts.append(item["text"])
elif item.get("type") == "image":
# MCP 返回的 base64 图片
img_data = item.get("data", "")
if img_data:
images.append(img_data)
out = {"success": True, "text": "\n".join(texts), "raw": result}
if images:
out["images"] = images
return out
return result
# ---------- 登录 ----------
def get_login_qrcode(self) -> dict:
"""获取登录二维码,返回 {success, text, qr_image(PIL.Image)}"""
result = self._call_tool("get_login_qrcode")
if "error" in result:
return result
# 尝试解析 base64 图片
qr_image = None
if "images" in result and result["images"]:
try:
img_bytes = base64.b64decode(result["images"][0])
qr_image = Image.open(io.BytesIO(img_bytes))
except Exception as e:
logger.warning("二维码图片解析失败: %s", e)
result["qr_image"] = qr_image
return result
def check_login_status(self) -> dict:
"""检查小红书登录状态"""
return self._call_tool("check_login_status")
# ---------- 连接状态 ----------
def check_connection(self) -> tuple[bool, str]:
"""检查 MCP 服务是否可连接"""
result = self._call_tool("check_login_status")
if "error" in result:
return False, result["error"]
return True, result.get("text", "已连接")
# ---------- 搜索 ----------
def search_feeds(self, keyword: str, sort_by: str = "综合",
note_type: str = "不限", publish_time: str = "不限") -> dict:
"""搜索小红书内容"""
args = {
"keyword": keyword,
"filters": {
"sort_by": sort_by,
"note_type": note_type,
"publish_time": publish_time,
}
}
return self._call_tool("search_feeds", args)
# ---------- 推荐列表 ----------
def list_feeds(self) -> dict:
"""获取首页推荐列表"""
return self._call_tool("list_feeds")
# ---------- 笔记列表解析 ----------
@staticmethod
def _parse_feed_entries(text: str) -> list[dict]:
"""从 MCP 返回文本中解析笔记条目为结构化列表"""
entries = []
# 方式1: 尝试直接 JSON 解析
try:
data = json.loads(text)
feeds = []
if isinstance(data, dict) and "feeds" in data:
feeds = data["feeds"]
elif isinstance(data, list):
feeds = data
for feed in feeds:
note = feed.get("noteCard", {})
user = note.get("user", {})
interact = note.get("interactInfo", {})
entries.append({
"feed_id": feed.get("id", ""),
"xsec_token": feed.get("xsecToken", ""),
"title": note.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": note.get("type", ""),
})
if entries:
return entries
except (json.JSONDecodeError, TypeError, AttributeError):
pass
# 方式2: 正则提取 —— 适配 MCP 的文本格式
# 匹配 feed_id (24位十六进制)
feed_ids = re.findall(r'(?:feed_id|id)["\s:]+([0-9a-f]{24})', text, re.I)
# 匹配 xsecToken
tokens = re.findall(r'(?:xsec_?[Tt]oken)["\s:]+([A-Za-z0-9+/=_-]{20,})', text, re.I)
# 匹配标题
titles = re.findall(r'(?:title|标题)["\s:]+(.+?)(?:\n|$)', text, re.I)
# 匹配 userId
user_ids = re.findall(r'(?:user_?[Ii]d|userId)["\s:]+([0-9a-f]{24})', text, re.I)
count = max(len(feed_ids), len(tokens))
for i in range(count):
entries.append({
"feed_id": feed_ids[i] if i < len(feed_ids) else "",
"xsec_token": tokens[i] if i < len(tokens) else "",
"title": titles[i].strip() if i < len(titles) else f"笔记 {i+1}",
"author": "",
"user_id": user_ids[i] if i < len(user_ids) else "",
"likes": "",
"type": "",
})
return entries
def list_feeds_parsed(self) -> list[dict]:
"""获取首页推荐并解析为结构化列表"""
result = self.list_feeds()
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
def search_feeds_parsed(self, keyword: str, sort_by: str = "综合") -> list[dict]:
"""搜索笔记并解析为结构化列表"""
result = self.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return []
return self._parse_feed_entries(result.get("text", ""))
# ---------- 帖子详情 ----------
def get_feed_detail(self, feed_id: str, xsec_token: str,
load_all_comments: bool = False) -> dict:
"""获取笔记详情"""
args = {
"feed_id": feed_id,
"xsec_token": xsec_token,
"load_all_comments": load_all_comments,
}
return self._call_tool("get_feed_detail", args)
# ---------- 发布 ----------
def publish_content(self, title: str, content: str, images: list[str],
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布图文内容"""
args = {
"title": title,
"content": content,
"images": images,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_content", args)
def publish_video(self, title: str, content: str, video_path: str,
tags: list[str] = None, schedule_at: str = None) -> dict:
"""发布视频内容"""
args = {
"title": title,
"content": content,
"video": video_path,
}
if tags:
args["tags"] = tags
if schedule_at:
args["schedule_at"] = schedule_at
return self._call_tool("publish_with_video", args)
# ---------- 评论 ----------
def post_comment(self, feed_id: str, xsec_token: str, comment: str) -> dict:
"""发表评论"""
return self._call_tool("post_comment_to_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"content": comment,
})
def reply_comment(self, feed_id: str, xsec_token: str,
comment_id: str, user_id: str, content: str) -> dict:
"""回复评论"""
return self._call_tool("reply_comment_in_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"comment_id": comment_id,
"user_id": user_id,
"content": content,
})
# ---------- 互动 ----------
def like_feed(self, feed_id: str, xsec_token: str, unlike: bool = False) -> dict:
"""点赞/取消点赞"""
return self._call_tool("like_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unlike": unlike,
})
def favorite_feed(self, feed_id: str, xsec_token: str,
unfavorite: bool = False) -> dict:
"""收藏/取消收藏"""
return self._call_tool("favorite_feed", {
"feed_id": feed_id,
"xsec_token": xsec_token,
"unfavorite": unfavorite,
})
# ---------- 用户 ----------
def get_user_profile(self, user_id: str, xsec_token: str) -> dict:
"""获取用户主页信息"""
return self._call_tool("user_profile", {
"user_id": user_id,
"xsec_token": xsec_token,
})