xhs_factory/main.py
zhoujie d782bb6781 feat(auto): 新增自动点赞和自动回复功能,完善项目文档
- 新增自动点赞功能【一键点赞】:支持关键词搜索笔记并随机批量点赞,提升账号活跃度
- 新增自动回复功能【一键回复】:自动扫描用户笔记的粉丝评论,使用AI生成并发送回复
- 扩展自动化调度器【定时调度】:支持点赞和回复任务的随机定时执行,模拟真人操作间隔
- 新增项目文档【文档】:添加README、CHANGELOG、CONTRIBUTING、LICENSE等核心文档文件
- 优化.gitignore文件【配置】:完善Python项目、IDE、敏感文件、日志等忽略规则
- 新增配置文件模板【配置】:提供config.example.json作为配置参考
- 优化MCP客户端【工具】:新增评论解析方法,支持从笔记详情中提取结构化评论数据
2026-02-08 22:40:16 +08:00

2256 lines
87 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 AI 爆文生产工坊 V2.0
全自动工作台:灵感 -> 文案 -> 绘图 -> 发布 -> 运营
"""
import gradio as gr
import os
import re
import json
import time
import logging
import platform
import subprocess
import threading
import random
from datetime import datetime
from PIL import Image
import matplotlib
import matplotlib.pyplot as plt
from config_manager import ConfigManager, OUTPUT_DIR
from llm_service import LLMService
from sd_service import SDService, DEFAULT_NEGATIVE
from mcp_client import MCPClient, get_mcp_client
# ================= matplotlib 中文字体配置 =================
_font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"]
for _fn in _font_candidates:
try:
matplotlib.font_manager.findfont(_fn, fallback_to_default=False)
plt.rcParams["font.sans-serif"] = [_fn]
break
except Exception:
continue
plt.rcParams["axes.unicode_minus"] = False
# ================= 日志配置 =================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("autobot.log", encoding="utf-8"),
],
)
logger = logging.getLogger("autobot")
# 强制不走代理连接本地服务
os.environ["NO_PROXY"] = "127.0.0.1,localhost"
# ================= 全局服务初始化 =================
cfg = ConfigManager()
cfg.ensure_workspace()
mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp"))
# ==================================================
# LLM 多提供商管理
# ==================================================
def _get_llm_config() -> tuple[str, str, str]:
"""获取当前激活 LLM 的 (api_key, base_url, model)"""
p = cfg.get_active_llm()
if p:
return p["api_key"], p["base_url"], cfg.get("model", "")
return "", "", ""
def connect_llm(provider_name):
"""连接选中的 LLM 提供商并获取模型列表"""
if not provider_name:
return gr.update(choices=[], value=None), "⚠️ 请先选择或添加 LLM 提供商"
cfg.set_active_llm(provider_name)
p = cfg.get_active_llm()
if not p:
return gr.update(choices=[], value=None), "❌ 未找到该提供商配置"
try:
svc = LLMService(p["api_key"], p["base_url"])
models = svc.get_models()
if models:
return (
gr.update(choices=models, value=models[0]),
f"✅ 已连接「{provider_name}」,加载 {len(models)} 个模型",
)
else:
# API 无法获取模型列表,保留手动输入
current_model = cfg.get("model", "")
return (
gr.update(choices=[current_model] if current_model else [], value=current_model or None),
f"⚠️ 已连接「{provider_name}」,但未获取到模型列表,请手动输入模型名",
)
except Exception as e:
logger.error("LLM 连接失败: %s", e)
current_model = cfg.get("model", "")
return (
gr.update(choices=[current_model] if current_model else [], value=current_model or None),
f"❌ 连接「{provider_name}」失败: {e}",
)
def add_llm_provider(name, api_key, base_url):
"""添加新的 LLM 提供商"""
msg = cfg.add_llm_provider(name, api_key, base_url)
names = cfg.get_llm_provider_names()
active = cfg.get("active_llm", "")
return (
gr.update(choices=names, value=active),
msg,
)
def remove_llm_provider(provider_name):
"""删除 LLM 提供商"""
if not provider_name:
return gr.update(choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", "")), "⚠️ 请先选择要删除的提供商"
msg = cfg.remove_llm_provider(provider_name)
names = cfg.get_llm_provider_names()
active = cfg.get("active_llm", "")
return (
gr.update(choices=names, value=active),
msg,
)
def on_provider_selected(provider_name):
"""切换 LLM 提供商时更新显示信息"""
if not provider_name:
return "未选择提供商"
for p in cfg.get_llm_providers():
if p["name"] == provider_name:
cfg.set_active_llm(provider_name)
masked_key = p["api_key"][:8] + "***" if len(p["api_key"]) > 8 else "***"
return f"**{provider_name}** \nAPI Key: `{masked_key}` \nBase URL: `{p['base_url']}`"
return "未找到该提供商"
# ==================================================
# Tab 1: 内容创作
# ==================================================
def connect_sd(sd_url):
"""连接 SD 并获取模型列表"""
try:
svc = SDService(sd_url)
ok, msg = svc.check_connection()
if ok:
models = svc.get_models()
cfg.set("sd_url", sd_url)
return gr.update(choices=models, value=models[0] if models else None), f"{msg}"
return gr.update(choices=[]), f"{msg}"
except Exception as e:
logger.error("SD 连接失败: %s", e)
return gr.update(choices=[]), f"❌ SD 连接失败: {e}"
def check_mcp_status(mcp_url):
"""检查 MCP 连接状态"""
try:
client = get_mcp_client(mcp_url)
ok, msg = client.check_connection()
if ok:
cfg.set("mcp_url", mcp_url)
return f"✅ MCP 服务正常 - {msg}"
return f"{msg}"
except Exception as e:
return f"❌ MCP 连接失败: {e}"
# ==================================================
# 小红书账号登录
# ==================================================
def get_login_qrcode(mcp_url):
"""获取小红书登录二维码"""
try:
client = get_mcp_client(mcp_url)
result = client.get_login_qrcode()
if "error" in result:
return None, f"❌ 获取二维码失败: {result['error']}"
qr_image = result.get("qr_image")
msg = result.get("text", "")
if qr_image:
return qr_image, f"✅ 二维码已生成,请用小红书 App 扫码\n{msg}"
return None, f"⚠️ 未获取到二维码图片MCP 返回:\n{msg}"
except Exception as e:
logger.error("获取登录二维码失败: %s", e)
return None, f"❌ 获取二维码失败: {e}"
def logout_xhs(mcp_url):
"""退出登录:清除 cookies 并重置本地 token"""
try:
client = get_mcp_client(mcp_url)
result = client.delete_cookies()
if "error" in result:
return f"❌ 退出失败: {result['error']}"
cfg.set("xsec_token", "")
client._reset()
return "✅ 已退出登录,可以重新扫码登录"
except Exception as e:
logger.error("退出登录失败: %s", e)
return f"❌ 退出失败: {e}"
def _auto_fetch_xsec_token(mcp_url) -> str:
"""从推荐列表自动获取一个有效的 xsec_token"""
try:
client = get_mcp_client(mcp_url)
entries = client.list_feeds_parsed()
for e in entries:
token = e.get("xsec_token", "")
if token:
return token
except Exception as e:
logger.warning("自动获取 xsec_token 失败: %s", e)
return ""
def check_login(mcp_url):
"""检查登录状态,登录成功后自动获取 xsec_token 并保存"""
try:
client = get_mcp_client(mcp_url)
result = client.check_login_status()
if "error" in result:
return f"{result['error']}", gr.update(), gr.update()
text = result.get("text", "")
if "未登录" in text:
return f"🔴 {text}", gr.update(), gr.update()
# 登录成功 → 自动获取 xsec_token
token = _auto_fetch_xsec_token(mcp_url)
if token:
cfg.set("xsec_token", token)
logger.info("自动获取 xsec_token 成功")
return (
f"🟢 {text}\n\n✅ xsec_token 已自动获取并保存",
gr.update(value=cfg.get("my_user_id", "")),
gr.update(value=token),
)
return f"🟢 {text}\n\n⚠️ 自动获取 xsec_token 失败,请手动刷新", gr.update(), gr.update()
except Exception as e:
return f"❌ 检查登录状态失败: {e}", gr.update(), gr.update()
def save_my_user_id(user_id_input):
"""保存用户 ID (验证 24 位十六进制格式)"""
uid = (user_id_input or "").strip()
if not uid:
cfg.set("my_user_id", "")
return "⚠️ 已清除用户 ID"
if not re.match(r'^[0-9a-fA-F]{24}$', uid):
return (
"❌ 格式错误!用户 ID 应为 24 位十六进制字符串\n"
f"你输入的: `{uid}` ({len(uid)} 位)\n\n"
"💡 如果你输入的是小红书号 (纯数字如 18688457507),那不是 userId。"
)
cfg.set("my_user_id", uid)
return f"✅ 用户 ID 已保存: `{uid}`"
def generate_copy(model, topic, style):
"""生成文案"""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "", "", "", "", "❌ 请先配置并连接 LLM 提供商"
try:
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style)
cfg.set("model", model)
tags = data.get("tags", [])
return (
data.get("title", ""),
data.get("content", ""),
data.get("sd_prompt", ""),
", ".join(tags) if tags else "",
"✅ 文案生成完毕",
)
except Exception as e:
logger.error("文案生成失败: %s", e)
return "", "", "", "", f"❌ 生成失败: {e}"
def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale):
"""生成图片"""
if not model:
return None, [], "❌ 未选择 SD 模型"
try:
svc = SDService(sd_url)
images = svc.txt2img(
prompt=prompt,
negative_prompt=neg_prompt,
model=model,
steps=int(steps),
cfg_scale=float(cfg_scale),
)
return images, images, f"✅ 生成 {len(images)} 张图片"
except Exception as e:
logger.error("图片生成失败: %s", e)
return None, [], f"❌ 绘图失败: {e}"
def one_click_export(title, content, images):
"""导出文案和图片到本地"""
if not title:
return "❌ 无法导出:没有标题"
safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20]
folder_name = f"{int(time.time())}_{safe_title}"
folder_path = os.path.join(OUTPUT_DIR, folder_name)
os.makedirs(folder_path, exist_ok=True)
with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f:
f.write(f"{title}\n\n{content}")
saved_paths = []
if images:
for idx, img in enumerate(images):
path = os.path.join(folder_path, f"{idx+1}.png")
if isinstance(img, Image.Image):
img.save(path)
saved_paths.append(os.path.abspath(path))
# 尝试打开文件夹
try:
abs_path = os.path.abspath(folder_path)
if platform.system() == "Windows":
os.startfile(abs_path)
elif platform.system() == "Darwin":
subprocess.call(["open", abs_path])
else:
subprocess.call(["xdg-open", abs_path])
except Exception:
pass
return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)"
def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time):
"""通过 MCP 发布到小红书"""
if not title:
return "❌ 缺少标题"
client = get_mcp_client(mcp_url)
# 收集图片路径
image_paths = []
# 先保存 AI 生成的图片到临时目录
if images:
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
os.makedirs(temp_dir, exist_ok=True)
for idx, img in enumerate(images):
if isinstance(img, Image.Image):
path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.png"))
img.save(path)
image_paths.append(path)
# 添加本地上传的图片
if local_images:
for img_file in local_images:
# Gradio File 组件返回的是 NamedString 或 tempfile path
img_path = img_file.name if hasattr(img_file, 'name') else str(img_file)
if os.path.exists(img_path):
image_paths.append(os.path.abspath(img_path))
if not image_paths:
return "❌ 至少需要 1 张图片才能发布"
# 解析标签
tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None
# 定时发布
schedule = schedule_time if schedule_time and schedule_time.strip() else None
try:
result = client.publish_content(
title=title,
content=content,
images=image_paths,
tags=tags,
schedule_at=schedule,
)
if "error" in result:
return f"❌ 发布失败: {result['error']}"
return f"✅ 发布成功!\n{result.get('text', '')}"
except Exception as e:
logger.error("发布失败: %s", e)
return f"❌ 发布异常: {e}"
# ==================================================
# Tab 2: 热点探测
# ==================================================
def search_hotspots(keyword, sort_by, mcp_url):
"""搜索小红书热门内容"""
if not keyword:
return "❌ 请输入搜索关键词", ""
try:
client = get_mcp_client(mcp_url)
result = client.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return f"❌ 搜索失败: {result['error']}", ""
text = result.get("text", "无结果")
return "✅ 搜索完成", text
except Exception as e:
logger.error("热点搜索失败: %s", e)
return f"❌ 搜索失败: {e}", ""
def analyze_and_suggest(model, keyword, search_result):
"""AI 分析热点并给出建议"""
if not search_result:
return "❌ 请先搜索", "", ""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ 请先配置 LLM 提供商", "", ""
try:
svc = LLMService(api_key, base_url, model)
analysis = svc.analyze_hotspots(search_result)
topics = "\n".join(f"{t}" for t in analysis.get("hot_topics", []))
patterns = "\n".join(f"{p}" for p in analysis.get("title_patterns", []))
suggestions = "\n".join(
f"**{s['topic']}** - {s['reason']}"
for s in analysis.get("suggestions", [])
)
structure = analysis.get("content_structure", "")
summary = (
f"## 🔥 热门选题\n{topics}\n\n"
f"## 📝 标题套路\n{patterns}\n\n"
f"## 📐 内容结构\n{structure}\n\n"
f"## 💡 推荐选题\n{suggestions}"
)
return "✅ 分析完成", summary, keyword
except Exception as e:
logger.error("热点分析失败: %s", e)
return f"❌ 分析失败: {e}", "", ""
def generate_from_hotspot(model, topic_from_hotspot, style, search_result):
"""基于热点分析生成文案"""
if not topic_from_hotspot:
return "", "", "", "", "❌ 请先选择或输入选题"
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "", "", "", "", "❌ 请先配置 LLM 提供商"
try:
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy_with_reference(
topic=topic_from_hotspot,
style=style,
reference_notes=search_result[:2000], # 截断防止超长
)
tags = data.get("tags", [])
return (
data.get("title", ""),
data.get("content", ""),
data.get("sd_prompt", ""),
", ".join(tags),
"✅ 基于热点的文案已生成",
)
except Exception as e:
return "", "", "", "", f"❌ 生成失败: {e}"
# ==================================================
# Tab 3: 评论管家
# ==================================================
# ---- 共用: 笔记列表缓存 ----
# 主动评论缓存
_cached_proactive_entries: list[dict] = []
# 我的笔记评论缓存
_cached_my_note_entries: list[dict] = []
def _fetch_and_cache(keyword, mcp_url, cache_name="proactive"):
"""通用: 获取笔记列表并缓存"""
global _cached_proactive_entries, _cached_my_note_entries
try:
client = get_mcp_client(mcp_url)
if keyword and keyword.strip():
entries = client.search_feeds_parsed(keyword.strip())
src = f"搜索「{keyword.strip()}"
else:
entries = client.list_feeds_parsed()
src = "首页推荐"
if cache_name == "proactive":
_cached_proactive_entries = entries
else:
_cached_my_note_entries = entries
if not entries:
return gr.update(choices=[], value=None), f"⚠️ 从{src}未找到笔记"
choices = []
for i, e in enumerate(entries):
title_short = (e["title"] or "无标题")[:28]
label = f"[{i+1}] {title_short} | @{e['author'] or '未知'} | ❤ {e['likes']}"
choices.append(label)
return (
gr.update(choices=choices, value=choices[0]),
f"✅ 从{src}获取 {len(entries)} 条笔记",
)
except Exception as e:
if cache_name == "proactive":
_cached_proactive_entries = []
else:
_cached_my_note_entries = []
return gr.update(choices=[], value=None), f"{e}"
def _pick_from_cache(selected, cache_name="proactive"):
"""通用: 从缓存中提取选中条目的 feed_id / xsec_token / title"""
cache = _cached_proactive_entries if cache_name == "proactive" else _cached_my_note_entries
if not selected or not cache:
return "", "", ""
try:
# 尝试从 [N] 前缀提取序号
idx = int(selected.split("]")[0].replace("[", "")) - 1
if 0 <= idx < len(cache):
e = cache[idx]
return e["feed_id"], e["xsec_token"], e.get("title", "")
except (ValueError, IndexError):
pass
# 回退: 模糊匹配标题
for e in cache:
if e.get("title", "")[:15] in selected:
return e["feed_id"], e["xsec_token"], e.get("title", "")
return "", "", ""
# ---- 模块 A: 主动评论他人 ----
def fetch_proactive_notes(keyword, mcp_url):
return _fetch_and_cache(keyword, mcp_url, "proactive")
def on_proactive_note_selected(selected):
return _pick_from_cache(selected, "proactive")
def load_note_for_comment(feed_id, xsec_token, mcp_url):
"""加载目标笔记详情 (标题+正文+已有评论), 用于 AI 分析"""
if not feed_id or not xsec_token:
return "❌ 请先选择笔记", "", "", ""
try:
client = get_mcp_client(mcp_url)
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
return f"{result['error']}", "", "", ""
full_text = result.get("text", "")
# 尝试分离正文和评论
if "评论" in full_text:
parts = full_text.split("评论", 1)
content_part = parts[0].strip()
comments_part = "评论" + parts[1] if len(parts) > 1 else ""
else:
content_part = full_text[:500]
comments_part = ""
return "✅ 笔记内容已加载", content_part[:800], comments_part[:1500], full_text
except Exception as e:
return f"{e}", "", "", ""
def ai_generate_comment(model, persona,
post_title, post_content, existing_comments):
"""AI 生成主动评论"""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
if not model:
return "⚠️ 请先连接 LLM", "❌ 未选模型"
if not post_title and not post_content:
return "⚠️ 请先加载笔记内容", "❌ 无笔记内容"
try:
svc = LLMService(api_key, base_url, model)
comment = svc.generate_proactive_comment(
persona, post_title, post_content[:600], existing_comments[:800]
)
return comment, "✅ 评论已生成"
except Exception as e:
logger.error(f"AI 评论生成失败: {e}")
return f"生成失败: {e}", f"{e}"
def send_comment(feed_id, xsec_token, comment_content, mcp_url):
"""发送评论到别人的笔记"""
if not all([feed_id, xsec_token, comment_content]):
return "❌ 缺少必要参数 (笔记ID / token / 评论内容)"
try:
client = get_mcp_client(mcp_url)
result = client.post_comment(feed_id, xsec_token, comment_content)
if "error" in result:
return f"{result['error']}"
return "✅ 评论已发送!"
except Exception as e:
return f"{e}"
# ---- 模块 B: 回复我的笔记评论 ----
def fetch_my_notes(mcp_url):
"""通过已保存的 userId 获取我的笔记列表"""
global _cached_my_note_entries
my_uid = cfg.get("my_user_id", "")
xsec = cfg.get("xsec_token", "")
if not my_uid:
return (
gr.update(choices=[], value=None),
"❌ 未配置用户 ID请先到「账号登录」页填写并保存",
)
if not xsec:
return (
gr.update(choices=[], value=None),
"❌ 未获取 xsec_token请先登录",
)
try:
client = get_mcp_client(mcp_url)
result = client.get_user_profile(my_uid, xsec)
if "error" in result:
return gr.update(choices=[], value=None), f"{result['error']}"
# 从 raw 中解析 feeds
raw = result.get("raw", {})
text = result.get("text", "")
data = None
if raw and isinstance(raw, dict):
for item in raw.get("content", []):
if item.get("type") == "text":
try:
data = json.loads(item["text"])
except (json.JSONDecodeError, KeyError):
pass
if not data:
try:
data = json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
feeds = (data or {}).get("feeds") or []
if not feeds:
return (
gr.update(choices=[], value=None),
"⚠️ 未找到你的笔记,可能账号还没有发布内容",
)
entries = []
for f in feeds:
nc = f.get("noteCard") or {}
user = nc.get("user") or {}
interact = nc.get("interactInfo") or {}
entries.append({
"feed_id": f.get("id", ""),
"xsec_token": f.get("xsecToken", ""),
"title": nc.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": nc.get("type", ""),
})
_cached_my_note_entries = entries
choices = [
f"[{i+1}] {e['title'][:20]} | {e['type']} | ❤{e['likes']}"
for i, e in enumerate(entries)
]
return (
gr.update(choices=choices, value=choices[0] if choices else None),
f"✅ 找到 {len(entries)} 篇笔记",
)
except Exception as e:
return gr.update(choices=[], value=None), f"{e}"
def on_my_note_selected(selected):
return _pick_from_cache(selected, "my_notes")
def fetch_my_note_comments(feed_id, xsec_token, mcp_url):
"""获取我的笔记的评论列表"""
if not feed_id or not xsec_token:
return "❌ 请先选择笔记", ""
try:
client = get_mcp_client(mcp_url)
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
return f"{result['error']}", ""
return "✅ 评论加载完成", result.get("text", "暂无评论")
except Exception as e:
return f"{e}", ""
def ai_reply_comment(model, persona, post_title, comment_text):
"""AI 生成评论回复"""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
if not model:
return "⚠️ 请先连接 LLM 并选择模型", "❌ 未选择模型"
if not comment_text:
return "请输入需要回复的评论内容", "⚠️ 请输入评论"
try:
svc = LLMService(api_key, base_url, model)
reply = svc.generate_reply(persona, post_title, comment_text)
return reply, "✅ 回复已生成"
except Exception as e:
logger.error(f"AI 回复生成失败: {e}")
return f"生成失败: {e}", f"{e}"
def send_reply(feed_id, xsec_token, reply_content, mcp_url):
"""发送评论回复"""
if not all([feed_id, xsec_token, reply_content]):
return "❌ 缺少必要参数"
try:
client = get_mcp_client(mcp_url)
result = client.post_comment(feed_id, xsec_token, reply_content)
if "error" in result:
return f"❌ 回复失败: {result['error']}"
return "✅ 回复已发送"
except Exception as e:
return f"❌ 发送失败: {e}"
# ==================================================
# Tab 4: 数据看板 (我的账号)
# ==================================================
def _parse_profile_json(text: str):
"""尝试从文本中解析用户 profile JSON"""
if not text:
return None
# 直接 JSON
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
# 可能包含 Markdown 代码块
m = re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', text)
if m:
try:
return json.loads(m.group(1))
except (json.JSONDecodeError, TypeError):
pass
return None
def _parse_count(val) -> float:
"""解析数字字符串, 支持 '1.2万' 格式"""
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip()
if "" in s:
try:
return float(s.replace("", "")) * 10000
except ValueError:
pass
try:
return float(s)
except ValueError:
return 0.0
def fetch_my_profile(user_id, xsec_token, mcp_url):
"""获取我的账号数据, 返回结构化信息 + 可视化图表"""
if not user_id or not xsec_token:
return "❌ 请填写你的用户 ID 和 xsec_token", "", None, None, None
try:
client = get_mcp_client(mcp_url)
result = client.get_user_profile(user_id, xsec_token)
if "error" in result:
return f"{result['error']}", "", None, None, None
raw = result.get("raw", {})
text = result.get("text", "")
# 尝试从 raw 或 text 解析 JSON
data = None
if raw and isinstance(raw, dict):
content_list = raw.get("content", [])
for item in content_list:
if item.get("type") == "text":
data = _parse_profile_json(item.get("text", ""))
if data:
break
if not data:
data = _parse_profile_json(text)
if not data:
return "✅ 数据加载完成 (纯文本)", text, None, None, None
# ---- 提取基本信息 (注意 MCP 对新号可能返回 null) ----
basic = data.get("userBasicInfo") or {}
interactions = data.get("interactions") or []
feeds = data.get("feeds") or []
gender_map = {0: "未知", 1: "", 2: ""}
info_lines = [
f"## 👤 {basic.get('nickname', '未知')}",
f"- **小红书号**: {basic.get('redId', '-')}",
f"- **性别**: {gender_map.get(basic.get('gender', 0), '未知')}",
f"- **IP 属地**: {basic.get('ipLocation', '-')}",
f"- **简介**: {basic.get('desc', '-')}",
"",
"### 📊 核心数据",
]
for inter in interactions:
info_lines.append(f"- **{inter.get('name', '')}**: {inter.get('count', '0')}")
info_lines.append(f"\n### 📝 展示笔记: {len(feeds)}")
profile_md = "\n".join(info_lines)
# ---- 互动数据柱状图 ----
fig_interact = None
if interactions:
inter_data = {i["name"]: _parse_count(i["count"]) for i in interactions}
fig_interact, ax = plt.subplots(figsize=(4, 3), dpi=100)
labels = list(inter_data.keys())
values = list(inter_data.values())
colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(labels)]
ax.bar(labels, values, color=colors, edgecolor="white", linewidth=0.5)
ax.set_title("账号核心指标", fontsize=12, fontweight="bold")
for i, v in enumerate(values):
display = f"{v/10000:.1f}" if v >= 10000 else str(int(v))
ax.text(i, v + max(values) * 0.02, display, ha="center", fontsize=9)
ax.set_ylabel("")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
fig_interact.tight_layout()
# ---- 笔记点赞分布图 ----
fig_notes = None
if feeds:
titles, likes = [], []
for f in feeds[:15]:
nc = f.get("noteCard") or {}
t = (nc.get("displayTitle", "") or "无标题")[:12]
lk = _parse_count((nc.get("interactInfo") or {}).get("likedCount", "0"))
titles.append(t)
likes.append(lk)
fig_notes, ax2 = plt.subplots(figsize=(7, 3.5), dpi=100)
ax2.barh(range(len(titles)), likes, color="#FF6B6B", edgecolor="white")
ax2.set_yticks(range(len(titles)))
ax2.set_yticklabels(titles, fontsize=8)
ax2.set_title(f"笔记点赞排行 (Top {len(titles)})", fontsize=12, fontweight="bold")
ax2.invert_yaxis()
for i, v in enumerate(likes):
display = f"{v/10000:.1f}" if v >= 10000 else str(int(v))
ax2.text(v + max(likes) * 0.01 if max(likes) > 0 else 0, i, display, va="center", fontsize=8)
ax2.spines["top"].set_visible(False)
ax2.spines["right"].set_visible(False)
fig_notes.tight_layout()
# ---- 笔记详情表格 (Markdown) ----
table_lines = [
"### 📋 笔记数据明细",
"| # | 标题 | 类型 | ❤ 点赞 |",
"|---|------|------|--------|",
]
for i, f in enumerate(feeds):
nc = f.get("noteCard") or {}
t = (nc.get("displayTitle", "") or "无标题")[:25]
tp = "📹 视频" if nc.get("type") == "video" else "📷 图文"
lk = (nc.get("interactInfo") or {}).get("likedCount", "0")
table_lines.append(f"| {i+1} | {t} | {tp} | {lk} |")
notes_table = "\n".join(table_lines)
return "✅ 数据加载完成", profile_md, fig_interact, fig_notes, notes_table
except Exception as e:
logger.error(f"获取我的数据失败: {e}")
return f"{e}", "", None, None, None
# ==================================================
# 自动化运营模块
# ==================================================
# 自动化状态
_auto_running = threading.Event()
_auto_thread: threading.Thread | None = None
_auto_log: list[str] = []
DEFAULT_TOPICS = [
"春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "平价好物",
"护肤心得", "妆容教程", "好物分享", "生活好物", "减脂餐分享",
"居家好物", "收纳技巧", "咖啡探店", "书单推荐", "旅行攻略",
]
DEFAULT_STYLES = ["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"]
DEFAULT_COMMENT_KEYWORDS = [
"穿搭", "美食", "护肤", "好物推荐", "旅行", "生活日常", "减脂",
]
def _auto_log_append(msg: str):
"""记录自动化日志"""
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] {msg}"
_auto_log.append(entry)
if len(_auto_log) > 500:
_auto_log[:] = _auto_log[-300:]
logger.info("[自动化] %s", msg)
def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text):
"""一键评论 + 同步刷新日志"""
msg = auto_comment_once(keywords_str, mcp_url, model, persona_text)
return msg, get_auto_log()
def auto_comment_once(keywords_str, mcp_url, model, persona_text):
"""一键评论:自动搜索高赞笔记 → AI生成评论 → 发送"""
try:
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
_auto_log_append(f"🔍 搜索关键词: {keyword}")
client = get_mcp_client(mcp_url)
# 搜索高赞笔记
entries = client.search_feeds_parsed(keyword, sort_by="最多点赞")
if not entries:
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
return "❌ 未找到任何笔记"
# 过滤掉自己的笔记
my_uid = cfg.get("my_user_id", "")
if my_uid:
filtered = [e for e in entries if e.get("user_id") != my_uid]
if filtered:
entries = filtered
# 从前10个中随机选择
target = random.choice(entries[:min(10, len(entries))])
feed_id = target["feed_id"]
xsec_token = target["xsec_token"]
title = target.get("title", "未知")
_auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')})")
if not feed_id or not xsec_token:
return "❌ 笔记缺少必要参数 (feed_id/xsec_token)"
# 模拟浏览延迟
time.sleep(random.uniform(2, 5))
# 加载笔记详情
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
return f"❌ 加载笔记失败: {result['error']}"
full_text = result.get("text", "")
if "评论" in full_text:
parts = full_text.split("评论", 1)
content_part = parts[0].strip()[:600]
comments_part = ("评论" + parts[1])[:800] if len(parts) > 1 else ""
else:
content_part = full_text[:500]
comments_part = ""
# AI 生成评论
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
svc = LLMService(api_key, base_url, model)
comment = svc.generate_proactive_comment(
persona_text, title, content_part, comments_part
)
_auto_log_append(f"💬 生成评论: {comment[:60]}...")
# 随机等待后发送
time.sleep(random.uniform(3, 8))
result = client.post_comment(feed_id, xsec_token, comment)
resp_text = result.get("text", "")
_auto_log_append(f"📡 MCP 响应: {resp_text[:200]}")
if "error" in result:
_auto_log_append(f"❌ 评论发送失败: {result['error']}")
return f"❌ 评论发送失败: {result['error']}"
# 检查是否真正成功
if "成功" not in resp_text and "success" not in resp_text.lower() and not resp_text:
_auto_log_append(f"⚠️ 评论可能未成功MCP 原始响应: {result}")
return f"⚠️ 评论状态不确定,请手动检查\nMCP 响应: {resp_text[:300]}\n📝 评论: {comment}"
_auto_log_append(f"✅ 评论已发送到「{title[:20]}")
return f"✅ 已评论「{title[:25]}\n📝 评论: {comment}\n\n💡 小红书可能有内容审核延迟,请稍等 1-2 分钟后查看"
except Exception as e:
_auto_log_append(f"❌ 一键评论异常: {e}")
return f"❌ 评论失败: {e}"
def _auto_like_with_log(keywords_str, like_count, mcp_url):
"""一键点赞 + 同步刷新日志"""
msg = auto_like_once(keywords_str, like_count, mcp_url)
return msg, get_auto_log()
def auto_like_once(keywords_str, like_count, mcp_url):
"""一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞"""
try:
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
like_count = int(like_count) if like_count else 5
_auto_log_append(f"👍 点赞关键词: {keyword} | 目标: {like_count}")
client = get_mcp_client(mcp_url)
# 搜索笔记
entries = client.search_feeds_parsed(keyword, sort_by="综合")
if not entries:
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
return "❌ 未找到任何笔记"
# 过滤自己的笔记
my_uid = cfg.get("my_user_id", "")
if my_uid:
filtered = [e for e in entries if e.get("user_id") != my_uid]
if filtered:
entries = filtered
# 随机打乱,取前 N 个
random.shuffle(entries)
targets = entries[:min(like_count, len(entries))]
liked = 0
for target in targets:
feed_id = target.get("feed_id", "")
xsec_token = target.get("xsec_token", "")
title = target.get("title", "未知")[:25]
if not feed_id or not xsec_token:
continue
# 模拟浏览延迟
time.sleep(random.uniform(2, 6))
result = client.like_feed(feed_id, xsec_token)
if "error" in result:
_auto_log_append(f" ❌ 点赞失败「{title}」: {result['error']}")
else:
liked += 1
_auto_log_append(f" ❤️ 已点赞「{title}」@{target.get('author', '未知')}")
_auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)}")
return f"✅ 点赞完成!成功 {liked}/{len(targets)}"
except Exception as e:
_auto_log_append(f"❌ 一键点赞异常: {e}")
return f"❌ 点赞失败: {e}"
def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布 + 同步刷新日志"""
msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model)
return msg, get_auto_log()
def _auto_reply_with_log(max_replies, mcp_url, model, persona_text):
"""一键回复 + 同步刷新日志"""
msg = auto_reply_once(max_replies, mcp_url, model, persona_text)
return msg, get_auto_log()
def auto_reply_once(max_replies, mcp_url, model, persona_text):
"""一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送"""
try:
my_uid = cfg.get("my_user_id", "")
xsec = cfg.get("xsec_token", "")
if not my_uid:
return "❌ 未配置用户 ID请到「账号登录」页填写"
if not xsec:
return "❌ 未获取 xsec_token请先登录"
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置"
max_replies = int(max_replies) if max_replies else 3
client = get_mcp_client(mcp_url)
_auto_log_append("💌 开始自动回复评论...")
# Step 1: 获取我的笔记列表
result = client.get_user_profile(my_uid, xsec)
if "error" in result:
_auto_log_append(f"❌ 获取我的笔记失败: {result['error']}")
return f"❌ 获取我的笔记失败: {result['error']}"
# 解析笔记列表
raw = result.get("raw", {})
text = result.get("text", "")
data = None
if raw and isinstance(raw, dict):
for item in raw.get("content", []):
if item.get("type") == "text":
try:
data = json.loads(item["text"])
except (json.JSONDecodeError, KeyError):
pass
if not data:
try:
data = json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
feeds = (data or {}).get("feeds") or []
if not feeds:
_auto_log_append("⚠️ 未找到任何笔记")
return "⚠️ 未找到你的笔记"
# 构建笔记条目
my_entries = []
for f in feeds:
nc = f.get("noteCard") or {}
my_entries.append({
"feed_id": f.get("id", ""),
"xsec_token": f.get("xsecToken", ""),
"title": nc.get("displayTitle", "未知标题"),
})
_auto_log_append(f"📝 找到 {len(my_entries)} 篇笔记,开始扫描评论...")
# Step 2: 遍历笔记,找到未回复的评论
total_replied = 0
svc = LLMService(api_key, base_url, model)
for entry in my_entries:
if total_replied >= max_replies:
break
feed_id = entry["feed_id"]
xsec_token = entry["xsec_token"]
title = entry["title"]
if not feed_id or not xsec_token:
continue
time.sleep(random.uniform(1, 3))
# 加载笔记详情(含评论)
detail = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in detail:
_auto_log_append(f"⚠️ 加载「{title[:15]}」评论失败,跳过")
continue
full_text = detail.get("text", "")
# 解析评论
comments = client._parse_comments(full_text)
if not comments:
continue
# 过滤掉自己的评论,只回复他人
other_comments = [
c for c in comments
if c.get("user_id") and c["user_id"] != my_uid and c.get("content")
]
if not other_comments:
continue
_auto_log_append(f"📖「{title[:20]}」有 {len(other_comments)} 条他人评论")
for comment in other_comments:
if total_replied >= max_replies:
break
comment_id = comment.get("comment_id", "")
comment_uid = comment.get("user_id", "")
comment_text = comment.get("content", "")
nickname = comment.get("nickname", "网友")
if not comment_text.strip():
continue
_auto_log_append(f" 💬 @{nickname}: {comment_text[:40]}...")
# AI 生成回复
try:
reply = svc.generate_reply(persona_text, title, comment_text)
except Exception as e:
_auto_log_append(f" ❌ AI 回复生成失败: {e}")
continue
_auto_log_append(f" 🤖 回复: {reply[:50]}...")
# 发送回复
time.sleep(random.uniform(2, 6))
if comment_id and comment_uid:
# 使用 reply_comment 精确回复
resp = client.reply_comment(
feed_id, xsec_token, comment_id, comment_uid, reply
)
else:
# 没有 comment_id 就用 post_comment 发到笔记下
resp = client.post_comment(feed_id, xsec_token, f"@{nickname} {reply}")
resp_text = resp.get("text", "")
if "error" in resp:
_auto_log_append(f" ❌ 回复发送失败: {resp['error']}")
else:
_auto_log_append(f" ✅ 已回复 @{nickname}")
total_replied += 1
if total_replied == 0:
_auto_log_append(" 没有找到需要回复的新评论")
return " 没有找到需要回复的新评论\n\n💡 可能所有评论都已回复过"
else:
_auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} 条评论")
return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n\n💡 小红书审核可能有延迟,请稍后查看"
except Exception as e:
_auto_log_append(f"❌ 自动回复异常: {e}")
return f"❌ 自动回复失败: {e}"
def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布:自动生成文案 → 生成图片 → 发布到小红书"""
try:
topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS
topic = random.choice(topics)
style = random.choice(DEFAULT_STYLES)
_auto_log_append(f"📝 主题: {topic} | 风格: {style}")
# 生成文案
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style)
title = (data.get("title", "") or "")[:20]
content = data.get("content", "")
sd_prompt = data.get("sd_prompt", "")
tags = data.get("tags", [])
if not title:
return "❌ 文案生成失败:无标题"
_auto_log_append(f"📄 文案: {title}")
# 生成图片
if not sd_url_val or not sd_model_name:
return "❌ SD WebUI 未连接或未选择模型,请先在全局设置中连接"
sd_svc = SDService(sd_url_val)
images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name)
if not images:
return "❌ 图片生成失败:没有返回图片"
_auto_log_append(f"🎨 已生成 {len(images)} 张图片")
# 保存图片到临时目录
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
os.makedirs(temp_dir, exist_ok=True)
image_paths = []
ts = int(time.time())
for idx, img in enumerate(images):
if isinstance(img, Image.Image):
path = os.path.abspath(os.path.join(temp_dir, f"auto_{ts}_{idx}.png"))
img.save(path)
image_paths.append(path)
if not image_paths:
return "❌ 图片保存失败"
# 发布到小红书
client = get_mcp_client(mcp_url)
result = client.publish_content(
title=title, content=content, images=image_paths, tags=tags
)
if "error" in result:
_auto_log_append(f"❌ 发布失败: {result['error']}")
return f"❌ 发布失败: {result['error']}"
_auto_log_append(f"🚀 发布成功: {title}")
return f"✅ 发布成功!\n📌 标题: {title}\n{result.get('text', '')}"
except Exception as e:
_auto_log_append(f"❌ 一键发布异常: {e}")
return f"❌ 发布失败: {e}"
def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enabled,
comment_min, comment_max, publish_min, publish_max,
reply_min, reply_max, max_replies_per_run,
like_min, like_max, like_count_per_run,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""后台定时调度循环"""
_auto_log_append("🤖 自动化调度器已启动")
# 首次执行的随机延迟
next_comment = time.time() + random.randint(10, 60)
next_publish = time.time() + random.randint(30, 120)
next_reply = time.time() + random.randint(15, 90)
next_like = time.time() + random.randint(5, 40)
while _auto_running.is_set():
now = time.time()
# 自动评论
if comment_enabled and now >= next_comment:
try:
_auto_log_append("--- 🔄 执行自动评论 ---")
msg = auto_comment_once(keywords, mcp_url, model, persona_text)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动评论异常: {e}")
interval = random.randint(int(comment_min) * 60, int(comment_max) * 60)
next_comment = time.time() + interval
_auto_log_append(f"⏰ 下次评论: {interval // 60} 分钟后")
# 自动点赞
if like_enabled and now >= next_like:
try:
_auto_log_append("--- 🔄 执行自动点赞 ---")
msg = auto_like_once(keywords, like_count_per_run, mcp_url)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动点赞异常: {e}")
interval = random.randint(int(like_min) * 60, int(like_max) * 60)
next_like = time.time() + interval
_auto_log_append(f"⏰ 下次点赞: {interval // 60} 分钟后")
# 自动发布
if publish_enabled and now >= next_publish:
try:
_auto_log_append("--- 🔄 执行自动发布 ---")
msg = auto_publish_once(topics, mcp_url, sd_url_val, sd_model_name, model)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动发布异常: {e}")
interval = random.randint(int(publish_min) * 60, int(publish_max) * 60)
next_publish = time.time() + interval
_auto_log_append(f"⏰ 下次发布: {interval // 60} 分钟后")
# 自动回复评论
if reply_enabled and now >= next_reply:
try:
_auto_log_append("--- 🔄 执行自动回复评论 ---")
msg = auto_reply_once(max_replies_per_run, mcp_url, model, persona_text)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动回复异常: {e}")
interval = random.randint(int(reply_min) * 60, int(reply_max) * 60)
next_reply = time.time() + interval
_auto_log_append(f"⏰ 下次回复: {interval // 60} 分钟后")
# 每5秒检查一次停止信号
for _ in range(5):
if not _auto_running.is_set():
break
time.sleep(1)
_auto_log_append("🛑 自动化调度器已停止")
def start_scheduler(comment_on, publish_on, reply_on, like_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""启动定时自动化"""
global _auto_thread
if _auto_running.is_set():
return "⚠️ 调度器已在运行中,请先停止"
if not comment_on and not publish_on and not reply_on and not like_on:
return "❌ 请至少启用一项自动化功能"
# 评论/回复需要 LLM点赞不需要
if (comment_on or reply_on):
api_key, _, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
_auto_running.set()
_auto_thread = threading.Thread(
target=_scheduler_loop,
args=(comment_on, publish_on, reply_on, like_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text),
daemon=True,
)
_auto_thread.start()
parts = []
if comment_on:
parts.append(f"评论 (每 {int(c_min)}-{int(c_max)} 分钟)")
if like_on:
parts.append(f"点赞 (每 {int(l_min)}-{int(l_max)} 分钟, {int(like_count_per_run)}个/轮)")
if publish_on:
parts.append(f"发布 (每 {int(p_min)}-{int(p_max)} 分钟)")
if reply_on:
parts.append(f"回复 (每 {int(r_min)}-{int(r_max)} 分钟, 每轮≤{int(max_replies_per_run)}条)")
_auto_log_append(f"调度器已启动: {' + '.join(parts)}")
return f"✅ 自动化已启动 🟢\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度"
def stop_scheduler():
"""停止定时自动化"""
if not _auto_running.is_set():
return "⚠️ 调度器未在运行"
_auto_running.clear()
_auto_log_append("⏹️ 收到停止信号,等待当前任务完成...")
return "🛑 调度器停止中...当前任务完成后将完全停止"
def get_auto_log():
"""获取自动化运行日志"""
if not _auto_log:
return "📋 暂无日志\n\n💡 点击「一键评论」「一键发布」或启动定时后日志将在此显示"
return "\n".join(_auto_log[-80:])
def get_scheduler_status():
"""获取调度器运行状态"""
if _auto_running.is_set():
return "🟢 **调度器运行中**"
return "⚪ **调度器未运行**"
# ==================================================
# UI 构建
# ==================================================
config = cfg.all
with gr.Blocks(
title="小红书 AI 爆文工坊 V2.0",
theme=gr.themes.Soft(),
css="""
.status-ok { color: #16a34a; font-weight: bold; }
.status-err { color: #dc2626; font-weight: bold; }
footer { display: none !important; }
""",
) as app:
gr.Markdown(
"# 🍒 小红书 AI 爆文生产工坊 V2.0\n"
"> 灵感 → 文案 → 绘图 → 发布 → 运营,一站式全闭环"
)
# 全局状态
state_images = gr.State([])
state_search_result = gr.State("")
# ============ 全局设置栏 ============
with gr.Accordion("⚙️ 全局设置 (自动保存)", open=False):
gr.Markdown("#### 🤖 LLM 提供商 (支持所有 OpenAI 兼容接口)")
with gr.Row():
llm_provider = gr.Dropdown(
label="选择 LLM 提供商",
choices=cfg.get_llm_provider_names(),
value=cfg.get("active_llm", ""),
interactive=True, scale=2,
)
btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm", scale=1)
with gr.Row():
llm_model = gr.Dropdown(
label="LLM 模型", value=config["model"],
allow_custom_value=True, interactive=True, scale=2,
)
llm_provider_info = gr.Markdown(
value="*选择提供商后显示详情*",
)
with gr.Accordion(" 添加 / 管理 LLM 提供商", open=False):
with gr.Row():
new_provider_name = gr.Textbox(
label="名称", placeholder="如: DeepSeek / GPT-4o / 通义千问",
scale=1,
)
new_provider_key = gr.Textbox(
label="API Key", type="password", scale=2,
)
new_provider_url = gr.Textbox(
label="Base URL", placeholder="https://api.openai.com/v1",
value="https://api.openai.com/v1", scale=2,
)
with gr.Row():
btn_add_provider = gr.Button("✅ 添加提供商", variant="primary", size="sm")
btn_del_provider = gr.Button("🗑️ 删除当前提供商", variant="stop", size="sm")
provider_mgmt_status = gr.Markdown("")
gr.Markdown("---")
with gr.Row():
mcp_url = gr.Textbox(
label="MCP Server URL", value=config["mcp_url"], scale=2,
)
sd_url = gr.Textbox(
label="SD WebUI URL", value=config["sd_url"], scale=2,
)
persona = gr.Textbox(
label="博主人设(评论回复用)",
value=config["persona"], scale=3,
)
with gr.Row():
btn_connect_sd = gr.Button("🎨 连接 SD", size="sm")
btn_check_mcp = gr.Button("📡 检查 MCP", size="sm")
with gr.Row():
sd_model = gr.Dropdown(
label="SD 模型", allow_custom_value=True,
interactive=True, scale=2,
)
status_bar = gr.Markdown("🔄 等待连接...")
# ============ Tab 页面 ============
with gr.Tabs():
# -------- Tab 1: 内容创作 --------
with gr.Tab("✨ 内容创作"):
with gr.Row():
# 左栏:输入
with gr.Column(scale=1):
gr.Markdown("### 💡 构思")
topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭")
style = gr.Dropdown(
["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷", "知识科普"],
label="风格", value="好物种草",
)
btn_gen_copy = gr.Button("✨ 第一步:生成文案", variant="primary")
gr.Markdown("---")
gr.Markdown("### 🎨 绘图参数")
with gr.Accordion("高级设置", open=False):
neg_prompt = gr.Textbox(
label="反向提示词", value=DEFAULT_NEGATIVE, lines=2,
)
steps = gr.Slider(15, 50, value=25, step=1, label="步数")
cfg_scale = gr.Slider(1, 15, value=7, step=0.5, label="CFG Scale")
btn_gen_img = gr.Button("🎨 第二步:生成图片", variant="primary")
# 中栏:文案编辑
with gr.Column(scale=1):
gr.Markdown("### 📝 文案编辑")
res_title = gr.Textbox(label="标题 (≤20字)", interactive=True)
res_content = gr.TextArea(
label="正文 (可手动修改)", lines=12, interactive=True,
)
res_prompt = gr.TextArea(
label="绘图提示词", lines=3, interactive=True,
)
res_tags = gr.Textbox(
label="话题标签 (逗号分隔)", interactive=True,
placeholder="穿搭, 春季, 好物种草",
)
# 右栏:预览 & 发布
with gr.Column(scale=1):
gr.Markdown("### 🖼️ 视觉预览")
gallery = gr.Gallery(label="AI 生成图片", columns=2, height=300)
local_images = gr.File(
label="📁 上传本地图片(可混排)",
file_count="multiple",
file_types=["image"],
)
gr.Markdown("### 🚀 发布")
schedule_time = gr.Textbox(
label="定时发布 (可选, ISO8601格式)",
placeholder="如 2026-02-08T18:00:00+08:00留空=立即发布",
)
with gr.Row():
btn_export = gr.Button("📂 导出本地", variant="secondary")
btn_publish = gr.Button("🚀 发布到小红书", variant="primary")
publish_msg = gr.Markdown("")
# -------- Tab 2: 热点探测 --------
with gr.Tab("🔥 热点探测"):
gr.Markdown("### 搜索热门内容 → AI 分析趋势 → 一键借鉴创作")
with gr.Row():
with gr.Column(scale=1):
hot_keyword = gr.Textbox(
label="搜索关键词", placeholder="如:春季穿搭",
)
hot_sort = gr.Dropdown(
["综合", "最新", "最多点赞", "最多评论", "最多收藏"],
label="排序", value="综合",
)
btn_search = gr.Button("🔍 搜索", variant="primary")
search_status = gr.Markdown("")
with gr.Column(scale=2):
search_output = gr.TextArea(
label="搜索结果", lines=12, interactive=False,
)
with gr.Row():
btn_analyze = gr.Button("🧠 AI 分析热点趋势", variant="primary")
analysis_status = gr.Markdown("")
analysis_output = gr.Markdown(label="分析报告")
topic_from_hot = gr.Textbox(
label="选择/输入创作选题", placeholder="基于分析选一个方向",
)
with gr.Row():
hot_style = gr.Dropdown(
["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"],
label="风格", value="好物种草",
)
btn_gen_from_hot = gr.Button("✨ 基于热点生成文案", variant="primary")
with gr.Row():
hot_title = gr.Textbox(label="生成的标题", interactive=True)
hot_content = gr.TextArea(label="生成的正文", lines=8, interactive=True)
with gr.Row():
hot_prompt = gr.TextArea(label="绘图提示词", lines=3, interactive=True)
hot_tags = gr.Textbox(label="标签", interactive=True)
hot_gen_status = gr.Markdown("")
btn_sync_to_create = gr.Button(
"📋 同步到「内容创作」Tab → 绘图 & 发布",
variant="primary",
)
# -------- Tab 3: 评论管家 --------
with gr.Tab("💬 评论管家"):
gr.Markdown("### 智能评论管理:主动评论引流 & 自动回复粉丝")
with gr.Tabs():
# ======== 子 Tab A: 主动评论他人 ========
with gr.Tab("✍️ 主动评论引流"):
gr.Markdown(
"> **流程**:搜索/浏览笔记 → 选择目标 → 加载内容 → "
"AI 分析笔记+已有评论自动生成高质量评论 → 一键发送"
)
# 笔记选择器
with gr.Row():
pro_keyword = gr.Textbox(
label="🔍 搜索关键词 (留空则获取推荐)",
placeholder="穿搭、美食、旅行…",
)
btn_pro_fetch = gr.Button("🔍 获取笔记", variant="primary")
with gr.Row():
pro_selector = gr.Dropdown(
label="📋 选择目标笔记",
choices=[], interactive=True,
)
pro_fetch_status = gr.Markdown("")
# 隐藏字段
with gr.Row():
pro_feed_id = gr.Textbox(label="笔记 ID", interactive=False)
pro_xsec_token = gr.Textbox(label="xsec_token", interactive=False)
pro_title = gr.Textbox(label="标题", interactive=False)
# 加载内容 & AI 分析
btn_pro_load = gr.Button("📖 加载笔记内容", variant="secondary")
pro_load_status = gr.Markdown("")
with gr.Row():
with gr.Column(scale=1):
pro_content = gr.TextArea(
label="📄 笔记正文摘要", lines=8, interactive=False,
)
with gr.Column(scale=1):
pro_comments = gr.TextArea(
label="💬 已有评论", lines=8, interactive=False,
)
# 隐藏: 完整文本
pro_full_text = gr.Textbox(visible=False)
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=1):
btn_pro_ai = gr.Button(
"🤖 AI 智能生成评论", variant="primary", size="lg",
)
pro_ai_status = gr.Markdown("")
with gr.Column(scale=2):
pro_comment_text = gr.TextArea(
label="✏️ 评论内容 (可手动修改)", lines=3,
interactive=True,
placeholder="点击左侧按钮自动生成,也可手动编写",
)
with gr.Row():
btn_pro_send = gr.Button("📩 发送评论", variant="primary")
pro_send_status = gr.Markdown("")
# ======== 子 Tab B: 回复我的评论 ========
with gr.Tab("💌 回复粉丝评论"):
gr.Markdown(
"> **流程**:选择我的笔记 → 加载评论 → "
"粘贴要回复的评论 → AI 生成回复 → 一键发送"
)
# 笔记选择器 (自动用已保存的 userId 获取)
with gr.Row():
btn_my_fetch = gr.Button("🔍 获取我的笔记", variant="primary")
with gr.Row():
my_selector = gr.Dropdown(
label="📋 选择我的笔记",
choices=[], interactive=True,
)
my_fetch_status = gr.Markdown("")
with gr.Row():
my_feed_id = gr.Textbox(label="笔记 ID", interactive=False)
my_xsec_token = gr.Textbox(label="xsec_token", interactive=False)
my_title = gr.Textbox(label="笔记标题", interactive=False)
btn_my_load_comments = gr.Button("📥 加载评论", variant="primary")
my_comment_status = gr.Markdown("")
my_comments_display = gr.TextArea(
label="📋 粉丝评论列表", lines=12, interactive=False,
)
gr.Markdown("---")
gr.Markdown("#### 📝 回复评论")
with gr.Row():
with gr.Column(scale=1):
my_target_comment = gr.TextArea(
label="要回复的评论内容", lines=3,
placeholder="从上方评论列表中复制粘贴要回复的评论",
)
btn_my_ai_reply = gr.Button(
"🤖 AI 生成回复", variant="secondary",
)
my_reply_gen_status = gr.Markdown("")
with gr.Column(scale=1):
my_reply_content = gr.TextArea(
label="回复内容 (可修改)", lines=3,
interactive=True,
)
btn_my_send_reply = gr.Button(
"📩 发送回复", variant="primary",
)
my_reply_status = gr.Markdown("")
# -------- Tab 4: 账号登录 --------
with gr.Tab("🔐 账号登录"):
gr.Markdown(
"### 小红书账号登录\n"
"> 扫码登录后自动获取 xsec_token配合用户 ID 即可使用所有功能"
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(
"**操作步骤:**\n"
"1. 确保 MCP 服务已启动\n"
"2. 点击「获取登录二维码」→ 用小红书 App 扫码\n"
"3. 点击「检查登录状态」→ 自动获取并保存 xsec_token\n"
"4. 首次使用请填写你的用户 ID 并点击保存\n\n"
"⚠️ 登录后不要在其他网页端登录同一账号,否则会被踢出"
)
btn_get_qrcode = gr.Button(
"📱 获取登录二维码", variant="primary", size="lg",
)
btn_check_login = gr.Button(
"🔍 检查登录状态 (自动获取 Token)",
variant="secondary", size="lg",
)
btn_logout = gr.Button(
"🚪 退出登录 (重新扫码)",
variant="stop", size="lg",
)
login_status = gr.Markdown("🔄 等待操作...")
gr.Markdown("---")
gr.Markdown(
"#### 📌 我的账号信息\n"
"> **注意**: 小红书号 ≠ 用户 ID\n"
"> - **小红书号 (redId)**: 如 `18688457507`,是你在 App 个人页看到的\n"
"> - **用户 ID (userId)**: 如 `5a695db6e8ac2b72e8af2a53`24位十六进制字符串\n\n"
"💡 **如何获取 userId?**\n"
"1. 用浏览器打开你的小红书主页\n"
"2. 网址格式为: `xiaohongshu.com/user/profile/xxxxxxxx`\n"
"3. `profile/` 后面的就是你的 userId"
)
login_user_id = gr.Textbox(
label="我的用户 ID (24位 userId, 非小红书号)",
value=config.get("my_user_id", ""),
placeholder="如: 5a695db6e8ac2b72e8af2a53",
)
login_xsec_token = gr.Textbox(
label="xsec_token (登录后自动获取)",
value=config.get("xsec_token", ""),
interactive=False,
)
btn_save_uid = gr.Button(
"💾 保存用户 ID", variant="secondary",
)
save_uid_status = gr.Markdown("")
with gr.Column(scale=1):
qr_image = gr.Image(
label="扫码登录", height=350, width=350,
)
# -------- Tab 5: 数据看板 --------
with gr.Tab("📊 数据看板"):
gr.Markdown(
"### 我的账号数据看板\n"
"> 用户 ID 和 xsec_token 从「账号登录」自动获取,直接点击加载即可"
)
with gr.Row():
with gr.Column(scale=1):
data_user_id = gr.Textbox(
label="我的用户 ID (自动填充)",
value=config.get("my_user_id", ""),
interactive=True,
)
data_xsec_token = gr.Textbox(
label="xsec_token (自动填充)",
value=config.get("xsec_token", ""),
interactive=True,
)
btn_refresh_token = gr.Button(
"🔄 刷新 Token", variant="secondary",
)
btn_load_my_data = gr.Button(
"📊 加载我的数据", variant="primary", size="lg",
)
data_status = gr.Markdown("")
with gr.Column(scale=2):
profile_card = gr.Markdown(
value="*等待加载...*",
label="账号概览",
)
gr.Markdown("---")
gr.Markdown("### 📈 数据可视化")
with gr.Row():
with gr.Column(scale=1):
chart_interact = gr.Plot(label="📊 核心指标")
with gr.Column(scale=2):
chart_notes = gr.Plot(label="❤ 笔记点赞排行")
gr.Markdown("---")
notes_detail = gr.Markdown(
value="*加载数据后显示笔记明细表格*",
label="笔记数据明细",
)
# -------- Tab 6: 自动运营 --------
with gr.Tab("🤖 自动运营"):
gr.Markdown(
"### 🤖 无人值守自动化运营\n"
"> 一键评论引流 + 一键回复粉丝 + 一键内容发布 + 随机定时全自动\n\n"
"⚠️ **注意**: 请确保已连接 LLM、SD WebUI 和 MCP 服务"
)
with gr.Row():
# 左栏: 一键操作
with gr.Column(scale=1):
gr.Markdown("#### 💬 一键智能评论")
gr.Markdown(
"> 自动搜索高赞笔记 → AI 分析内容 → 生成评论 → 发送\n"
"每次随机选关键词搜索,从结果中随机选笔记"
)
auto_comment_keywords = gr.Textbox(
label="评论关键词池 (逗号分隔)",
value="穿搭, 美食, 护肤, 好物推荐, 旅行, 生活日常",
placeholder="关键词1, 关键词2, ...",
)
btn_auto_comment = gr.Button(
"💬 一键评论 (单次)", variant="primary", size="lg",
)
auto_comment_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### <20> 一键自动点赞")
gr.Markdown(
"> 搜索笔记 → 随机选择多篇 → 依次点赞\n"
"提升账号活跃度,无需 LLM"
)
auto_like_count = gr.Number(
label="单次点赞数量", value=5, minimum=1, maximum=20,
)
btn_auto_like = gr.Button(
"👍 一键点赞 (单次)", variant="primary", size="lg",
)
auto_like_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### <20>💌 一键自动回复")
gr.Markdown(
"> 扫描我的所有笔记 → 找到粉丝评论 → AI 生成回复 → 逐条发送\n"
"自动跳过自己的评论,模拟真人间隔回复"
)
auto_reply_max = gr.Number(
label="单次最多回复条数", value=5, minimum=1, maximum=20,
)
btn_auto_reply = gr.Button(
"💌 一键回复 (单次)", variant="primary", size="lg",
)
auto_reply_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 🚀 一键智能发布")
gr.Markdown(
"> 随机选主题+风格 → AI 生成文案 → SD 生成图片 → 自动发布"
)
auto_publish_topics = gr.Textbox(
label="主题池 (逗号分隔)",
value="春季穿搭, 通勤穿搭, 显瘦穿搭, 平价好物, 护肤心得, 好物分享",
placeholder="主题1, 主题2, ...",
)
btn_auto_publish = gr.Button(
"🚀 一键发布 (单次)", variant="primary", size="lg",
)
auto_publish_result = gr.Markdown("")
# 右栏: 定时自动化
with gr.Column(scale=1):
gr.Markdown("#### ⏰ 随机定时自动化")
gr.Markdown(
"> 设置时间间隔后启动,系统将在随机时间自动执行\n"
"> 模拟真人操作节奏,降低被检测风险"
)
sched_status = gr.Markdown("⚪ **调度器未运行**")
with gr.Group():
sched_comment_on = gr.Checkbox(
label="✅ 启用自动评论", value=True,
)
with gr.Row():
sched_c_min = gr.Number(
label="评论最小间隔(分钟)", value=15, minimum=5,
)
sched_c_max = gr.Number(
label="评论最大间隔(分钟)", value=45, minimum=10,
)
with gr.Group():
sched_like_on = gr.Checkbox(
label="✅ 启用自动点赞", value=True,
)
with gr.Row():
sched_l_min = gr.Number(
label="点赞最小间隔(分钟)", value=10, minimum=3,
)
sched_l_max = gr.Number(
label="点赞最大间隔(分钟)", value=30, minimum=5,
)
sched_like_count = gr.Number(
label="每轮点赞数量", value=5, minimum=1, maximum=15,
)
with gr.Group():
sched_reply_on = gr.Checkbox(
label="✅ 启用自动回复评论", value=True,
)
with gr.Row():
sched_r_min = gr.Number(
label="回复最小间隔(分钟)", value=20, minimum=5,
)
sched_r_max = gr.Number(
label="回复最大间隔(分钟)", value=60, minimum=10,
)
sched_reply_max = gr.Number(
label="每轮最多回复条数", value=3, minimum=1, maximum=10,
)
with gr.Group():
sched_publish_on = gr.Checkbox(
label="✅ 启用自动发布", value=True,
)
with gr.Row():
sched_p_min = gr.Number(
label="发布最小间隔(分钟)", value=60, minimum=30,
)
sched_p_max = gr.Number(
label="发布最大间隔(分钟)", value=180, minimum=60,
)
with gr.Row():
btn_start_sched = gr.Button(
"▶️ 启动定时", variant="primary", size="lg",
)
btn_stop_sched = gr.Button(
"⏹️ 停止定时", variant="stop", size="lg",
)
sched_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 📋 运行日志")
with gr.Row():
btn_refresh_log = gr.Button("🔄 刷新日志", size="sm")
btn_clear_log = gr.Button("🗑️ 清空日志", size="sm")
auto_log_display = gr.TextArea(
label="自动化运行日志",
value="📋 暂无日志\n\n💡 执行操作后日志将在此显示",
lines=15,
interactive=False,
)
# ==================================================
# 事件绑定
# ==================================================
# ---- 全局设置: LLM 提供商管理 ----
btn_connect_llm.click(
fn=connect_llm, inputs=[llm_provider],
outputs=[llm_model, status_bar],
)
llm_provider.change(
fn=on_provider_selected,
inputs=[llm_provider],
outputs=[llm_provider_info],
)
btn_add_provider.click(
fn=add_llm_provider,
inputs=[new_provider_name, new_provider_key, new_provider_url],
outputs=[llm_provider, provider_mgmt_status],
)
btn_del_provider.click(
fn=remove_llm_provider,
inputs=[llm_provider],
outputs=[llm_provider, provider_mgmt_status],
)
btn_connect_sd.click(
fn=connect_sd, inputs=[sd_url],
outputs=[sd_model, status_bar],
)
btn_check_mcp.click(
fn=check_mcp_status, inputs=[mcp_url],
outputs=[status_bar],
)
# ---- Tab 1: 内容创作 ----
btn_gen_copy.click(
fn=generate_copy,
inputs=[llm_model, topic, style],
outputs=[res_title, res_content, res_prompt, res_tags, status_bar],
)
btn_gen_img.click(
fn=generate_images,
inputs=[sd_url, res_prompt, neg_prompt, sd_model, steps, cfg_scale],
outputs=[gallery, state_images, status_bar],
)
btn_export.click(
fn=one_click_export,
inputs=[res_title, res_content, state_images],
outputs=[publish_msg],
)
btn_publish.click(
fn=publish_to_xhs,
inputs=[res_title, res_content, res_tags, state_images,
local_images, mcp_url, schedule_time],
outputs=[publish_msg],
)
# ---- Tab 2: 热点探测 ----
btn_search.click(
fn=search_hotspots,
inputs=[hot_keyword, hot_sort, mcp_url],
outputs=[search_status, search_output],
)
# 搜索结果同步到 state
search_output.change(
fn=lambda x: x, inputs=[search_output], outputs=[state_search_result],
)
btn_analyze.click(
fn=analyze_and_suggest,
inputs=[llm_model, hot_keyword, search_output],
outputs=[analysis_status, analysis_output, topic_from_hot],
)
btn_gen_from_hot.click(
fn=generate_from_hotspot,
inputs=[llm_model, topic_from_hot, hot_style, search_output],
outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status],
)
# 同步热点文案到内容创作 Tab
btn_sync_to_create.click(
fn=lambda t, c, p, tg: (t, c, p, tg, "✅ 已同步到「内容创作」,可切换 Tab 继续绘图和发布"),
inputs=[hot_title, hot_content, hot_prompt, hot_tags],
outputs=[res_title, res_content, res_prompt, res_tags, status_bar],
)
# ---- Tab 3: 评论管家 ----
# == 子 Tab A: 主动评论引流 ==
btn_pro_fetch.click(
fn=fetch_proactive_notes,
inputs=[pro_keyword, mcp_url],
outputs=[pro_selector, pro_fetch_status],
)
pro_selector.change(
fn=on_proactive_note_selected,
inputs=[pro_selector],
outputs=[pro_feed_id, pro_xsec_token, pro_title],
)
btn_pro_load.click(
fn=load_note_for_comment,
inputs=[pro_feed_id, pro_xsec_token, mcp_url],
outputs=[pro_load_status, pro_content, pro_comments, pro_full_text],
)
btn_pro_ai.click(
fn=ai_generate_comment,
inputs=[llm_model, persona,
pro_title, pro_content, pro_comments],
outputs=[pro_comment_text, pro_ai_status],
)
btn_pro_send.click(
fn=send_comment,
inputs=[pro_feed_id, pro_xsec_token, pro_comment_text, mcp_url],
outputs=[pro_send_status],
)
# == 子 Tab B: 回复粉丝评论 ==
btn_my_fetch.click(
fn=fetch_my_notes,
inputs=[mcp_url],
outputs=[my_selector, my_fetch_status],
)
my_selector.change(
fn=on_my_note_selected,
inputs=[my_selector],
outputs=[my_feed_id, my_xsec_token, my_title],
)
btn_my_load_comments.click(
fn=fetch_my_note_comments,
inputs=[my_feed_id, my_xsec_token, mcp_url],
outputs=[my_comment_status, my_comments_display],
)
btn_my_ai_reply.click(
fn=ai_reply_comment,
inputs=[llm_model, persona,
my_title, my_target_comment],
outputs=[my_reply_content, my_reply_gen_status],
)
btn_my_send_reply.click(
fn=send_reply,
inputs=[my_feed_id, my_xsec_token, my_reply_content, mcp_url],
outputs=[my_reply_status],
)
# ---- Tab 4: 账号登录 ----
btn_get_qrcode.click(
fn=get_login_qrcode,
inputs=[mcp_url],
outputs=[qr_image, login_status],
)
btn_check_login.click(
fn=check_login,
inputs=[mcp_url],
outputs=[login_status, login_user_id, login_xsec_token],
)
btn_logout.click(
fn=logout_xhs,
inputs=[mcp_url],
outputs=[login_status],
)
btn_save_uid.click(
fn=save_my_user_id,
inputs=[login_user_id],
outputs=[save_uid_status],
)
# ---- Tab 5: 数据看板 ----
def refresh_xsec_token(mcp_url):
token = _auto_fetch_xsec_token(mcp_url)
if token:
cfg.set("xsec_token", token)
return gr.update(value=token), "✅ Token 已刷新"
return gr.update(value=cfg.get("xsec_token", "")), "❌ 刷新失败,请确认已登录"
btn_refresh_token.click(
fn=refresh_xsec_token,
inputs=[mcp_url],
outputs=[data_xsec_token, data_status],
)
btn_load_my_data.click(
fn=fetch_my_profile,
inputs=[data_user_id, data_xsec_token, mcp_url],
outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail],
)
# ---- Tab 6: 自动运营 ----
btn_auto_comment.click(
fn=_auto_comment_with_log,
inputs=[auto_comment_keywords, mcp_url, llm_model, persona],
outputs=[auto_comment_result, auto_log_display],
)
btn_auto_like.click(
fn=_auto_like_with_log,
inputs=[auto_comment_keywords, auto_like_count, mcp_url],
outputs=[auto_like_result, auto_log_display],
)
btn_auto_reply.click(
fn=_auto_reply_with_log,
inputs=[auto_reply_max, mcp_url, llm_model, persona],
outputs=[auto_reply_result, auto_log_display],
)
btn_auto_publish.click(
fn=_auto_publish_with_log,
inputs=[auto_publish_topics, mcp_url, sd_url, sd_model, llm_model],
outputs=[auto_publish_result, auto_log_display],
)
btn_start_sched.click(
fn=start_scheduler,
inputs=[sched_comment_on, sched_publish_on, sched_reply_on, sched_like_on,
sched_c_min, sched_c_max, sched_p_min, sched_p_max,
sched_r_min, sched_r_max, sched_reply_max,
sched_l_min, sched_l_max, sched_like_count,
auto_comment_keywords, auto_publish_topics,
mcp_url, sd_url, sd_model, llm_model, persona],
outputs=[sched_result],
)
btn_stop_sched.click(
fn=stop_scheduler,
inputs=[],
outputs=[sched_result],
)
btn_refresh_log.click(
fn=lambda: (get_auto_log(), get_scheduler_status()),
inputs=[],
outputs=[auto_log_display, sched_status],
)
btn_clear_log.click(
fn=lambda: (_auto_log.clear() or "📋 日志已清空"),
inputs=[],
outputs=[auto_log_display],
)
# ---- 启动时自动刷新 SD ----
app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar])
# ==================================================
if __name__ == "__main__":
logger.info("🍒 小红书 AI 爆文工坊 V2.0 启动中...")
app.launch(inbrowser=True, share=False)