xhs_factory/main.py
zhoujie 500e47ebcb feat(系统): 新增 Windows 开机自启功能
- 新增开机自启管理模块,支持静默后台启动
- 创建 `_autostart.bat` 和 `_autostart.vbs` 脚本实现无窗口启动
- 在 UI 设置页面添加开机自启开关控件
- 通过注册表管理自启项,支持启用/禁用状态切换

♻️ refactor(评论): 优化评论解析逻辑并增强 AI 回复自然度

- 重构 `get_feed_comments` 方法,优先从结构化 JSON 提取评论数据
- 改进 `_parse_comments` 方法,支持多种嵌套格式的评论列表解析
- 新增 `_humanize` 和 `_humanize_content` 方法,去除 AI 生成内容的书面痕迹
- 调整多个提示词模板,强调真人化、口语化的写作风格,避免 AI 特征
- 提高生成回复和评论时的温度参数,增加输出多样性
2026-02-09 21:20:14 +08:00

2880 lines
112 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
小红书 AI 爆文生产工坊 V2.0
全自动工作台:灵感 -> 文案 -> 绘图 -> 发布 -> 运营
"""
import gradio as gr
import os
import re
import json
import time
import logging
import platform
import subprocess
import threading
import random
from datetime import datetime
from PIL import Image
import matplotlib
import matplotlib.pyplot as plt
from config_manager import ConfigManager, OUTPUT_DIR
from llm_service import LLMService
from sd_service import SDService, DEFAULT_NEGATIVE
from mcp_client import MCPClient, get_mcp_client
# ================= matplotlib 中文字体配置 =================
_font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"]
for _fn in _font_candidates:
try:
matplotlib.font_manager.findfont(_fn, fallback_to_default=False)
plt.rcParams["font.sans-serif"] = [_fn]
break
except Exception:
continue
plt.rcParams["axes.unicode_minus"] = False
# ================= 日志配置 =================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("autobot.log", encoding="utf-8"),
],
)
logger = logging.getLogger("autobot")
# 强制不走代理连接本地服务
os.environ["NO_PROXY"] = "127.0.0.1,localhost"
# ================= 全局服务初始化 =================
cfg = ConfigManager()
cfg.ensure_workspace()
mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp"))
# ==================================================
# LLM 多提供商管理
# ==================================================
def _get_llm_config() -> tuple[str, str, str]:
"""获取当前激活 LLM 的 (api_key, base_url, model)"""
p = cfg.get_active_llm()
if p:
return p["api_key"], p["base_url"], cfg.get("model", "")
return "", "", ""
def connect_llm(provider_name):
"""连接选中的 LLM 提供商并获取模型列表"""
if not provider_name:
return gr.update(choices=[], value=None), "⚠️ 请先选择或添加 LLM 提供商"
cfg.set_active_llm(provider_name)
p = cfg.get_active_llm()
if not p:
return gr.update(choices=[], value=None), "❌ 未找到该提供商配置"
try:
svc = LLMService(p["api_key"], p["base_url"])
models = svc.get_models()
if models:
return (
gr.update(choices=models, value=models[0]),
f"✅ 已连接「{provider_name}」,加载 {len(models)} 个模型",
)
else:
# API 无法获取模型列表,保留手动输入
current_model = cfg.get("model", "")
return (
gr.update(choices=[current_model] if current_model else [], value=current_model or None),
f"⚠️ 已连接「{provider_name}」,但未获取到模型列表,请手动输入模型名",
)
except Exception as e:
logger.error("LLM 连接失败: %s", e)
current_model = cfg.get("model", "")
return (
gr.update(choices=[current_model] if current_model else [], value=current_model or None),
f"❌ 连接「{provider_name}」失败: {e}",
)
def add_llm_provider(name, api_key, base_url):
"""添加新的 LLM 提供商"""
msg = cfg.add_llm_provider(name, api_key, base_url)
names = cfg.get_llm_provider_names()
active = cfg.get("active_llm", "")
return (
gr.update(choices=names, value=active),
msg,
)
def remove_llm_provider(provider_name):
"""删除 LLM 提供商"""
if not provider_name:
return gr.update(choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", "")), "⚠️ 请先选择要删除的提供商"
msg = cfg.remove_llm_provider(provider_name)
names = cfg.get_llm_provider_names()
active = cfg.get("active_llm", "")
return (
gr.update(choices=names, value=active),
msg,
)
def on_provider_selected(provider_name):
"""切换 LLM 提供商时更新显示信息"""
if not provider_name:
return "未选择提供商"
for p in cfg.get_llm_providers():
if p["name"] == provider_name:
cfg.set_active_llm(provider_name)
masked_key = p["api_key"][:8] + "***" if len(p["api_key"]) > 8 else "***"
return f"**{provider_name}** \nAPI Key: `{masked_key}` \nBase URL: `{p['base_url']}`"
return "未找到该提供商"
# ==================================================
# Tab 1: 内容创作
# ==================================================
def connect_sd(sd_url):
"""连接 SD 并获取模型列表"""
try:
svc = SDService(sd_url)
ok, msg = svc.check_connection()
if ok:
models = svc.get_models()
cfg.set("sd_url", sd_url)
return gr.update(choices=models, value=models[0] if models else None), f"{msg}"
return gr.update(choices=[]), f"{msg}"
except Exception as e:
logger.error("SD 连接失败: %s", e)
return gr.update(choices=[]), f"❌ SD 连接失败: {e}"
def check_mcp_status(mcp_url):
"""检查 MCP 连接状态"""
try:
client = get_mcp_client(mcp_url)
ok, msg = client.check_connection()
if ok:
cfg.set("mcp_url", mcp_url)
return f"✅ MCP 服务正常 - {msg}"
return f"{msg}"
except Exception as e:
return f"❌ MCP 连接失败: {e}"
# ==================================================
# 小红书账号登录
# ==================================================
def get_login_qrcode(mcp_url):
"""获取小红书登录二维码"""
try:
client = get_mcp_client(mcp_url)
result = client.get_login_qrcode()
if "error" in result:
return None, f"❌ 获取二维码失败: {result['error']}"
qr_image = result.get("qr_image")
msg = result.get("text", "")
if qr_image:
return qr_image, f"✅ 二维码已生成,请用小红书 App 扫码\n{msg}"
return None, f"⚠️ 未获取到二维码图片MCP 返回:\n{msg}"
except Exception as e:
logger.error("获取登录二维码失败: %s", e)
return None, f"❌ 获取二维码失败: {e}"
def logout_xhs(mcp_url):
"""退出登录:清除 cookies 并重置本地 token"""
try:
client = get_mcp_client(mcp_url)
result = client.delete_cookies()
if "error" in result:
return f"❌ 退出失败: {result['error']}"
cfg.set("xsec_token", "")
client._reset()
return "✅ 已退出登录,可以重新扫码登录"
except Exception as e:
logger.error("退出登录失败: %s", e)
return f"❌ 退出失败: {e}"
def _auto_fetch_xsec_token(mcp_url) -> str:
"""从推荐列表自动获取一个有效的 xsec_token"""
try:
client = get_mcp_client(mcp_url)
entries = client.list_feeds_parsed()
for e in entries:
token = e.get("xsec_token", "")
if token:
return token
except Exception as e:
logger.warning("自动获取 xsec_token 失败: %s", e)
return ""
def check_login(mcp_url):
"""检查登录状态,登录成功后自动获取 xsec_token 并保存"""
try:
client = get_mcp_client(mcp_url)
result = client.check_login_status()
if "error" in result:
return f"{result['error']}", gr.update(), gr.update()
text = result.get("text", "")
if "未登录" in text:
return f"🔴 {text}", gr.update(), gr.update()
# 登录成功 → 自动获取 xsec_token
token = _auto_fetch_xsec_token(mcp_url)
if token:
cfg.set("xsec_token", token)
logger.info("自动获取 xsec_token 成功")
return (
f"🟢 {text}\n\n✅ xsec_token 已自动获取并保存",
gr.update(value=cfg.get("my_user_id", "")),
gr.update(value=token),
)
return f"🟢 {text}\n\n⚠️ 自动获取 xsec_token 失败,请手动刷新", gr.update(), gr.update()
except Exception as e:
return f"❌ 检查登录状态失败: {e}", gr.update(), gr.update()
def save_my_user_id(user_id_input):
"""保存用户 ID (验证 24 位十六进制格式)"""
uid = (user_id_input or "").strip()
if not uid:
cfg.set("my_user_id", "")
return "⚠️ 已清除用户 ID"
if not re.match(r'^[0-9a-fA-F]{24}$', uid):
return (
"❌ 格式错误!用户 ID 应为 24 位十六进制字符串\n"
f"你输入的: `{uid}` ({len(uid)} 位)\n\n"
"💡 如果你输入的是小红书号 (纯数字如 18688457507),那不是 userId。"
)
cfg.set("my_user_id", uid)
return f"✅ 用户 ID 已保存: `{uid}`"
def generate_copy(model, topic, style):
"""生成文案"""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "", "", "", "", "❌ 请先配置并连接 LLM 提供商"
try:
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style)
cfg.set("model", model)
tags = data.get("tags", [])
return (
data.get("title", ""),
data.get("content", ""),
data.get("sd_prompt", ""),
", ".join(tags) if tags else "",
"✅ 文案生成完毕",
)
except Exception as e:
logger.error("文案生成失败: %s", e)
return "", "", "", "", f"❌ 生成失败: {e}"
def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale):
"""生成图片"""
if not model:
return None, [], "❌ 未选择 SD 模型"
try:
svc = SDService(sd_url)
images = svc.txt2img(
prompt=prompt,
negative_prompt=neg_prompt,
model=model,
steps=int(steps),
cfg_scale=float(cfg_scale),
)
return images, images, f"✅ 生成 {len(images)} 张图片"
except Exception as e:
logger.error("图片生成失败: %s", e)
return None, [], f"❌ 绘图失败: {e}"
def one_click_export(title, content, images):
"""导出文案和图片到本地"""
if not title:
return "❌ 无法导出:没有标题"
safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20]
folder_name = f"{int(time.time())}_{safe_title}"
folder_path = os.path.join(OUTPUT_DIR, folder_name)
os.makedirs(folder_path, exist_ok=True)
with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f:
f.write(f"{title}\n\n{content}")
saved_paths = []
if images:
for idx, img in enumerate(images):
path = os.path.join(folder_path, f"{idx+1}.png")
if isinstance(img, Image.Image):
img.save(path)
saved_paths.append(os.path.abspath(path))
# 尝试打开文件夹
try:
abs_path = os.path.abspath(folder_path)
if platform.system() == "Windows":
os.startfile(abs_path)
elif platform.system() == "Darwin":
subprocess.call(["open", abs_path])
else:
subprocess.call(["xdg-open", abs_path])
except Exception:
pass
return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)"
def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time):
"""通过 MCP 发布到小红书"""
if not title:
return "❌ 缺少标题"
client = get_mcp_client(mcp_url)
# 收集图片路径
image_paths = []
# 先保存 AI 生成的图片到临时目录
if images:
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
os.makedirs(temp_dir, exist_ok=True)
for idx, img in enumerate(images):
if isinstance(img, Image.Image):
path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.png"))
img.save(path)
image_paths.append(path)
# 添加本地上传的图片
if local_images:
for img_file in local_images:
# Gradio File 组件返回的是 NamedString 或 tempfile path
img_path = img_file.name if hasattr(img_file, 'name') else str(img_file)
if os.path.exists(img_path):
image_paths.append(os.path.abspath(img_path))
if not image_paths:
return "❌ 至少需要 1 张图片才能发布"
# 解析标签
tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None
# 定时发布
schedule = schedule_time if schedule_time and schedule_time.strip() else None
try:
result = client.publish_content(
title=title,
content=content,
images=image_paths,
tags=tags,
schedule_at=schedule,
)
if "error" in result:
return f"❌ 发布失败: {result['error']}"
return f"✅ 发布成功!\n{result.get('text', '')}"
except Exception as e:
logger.error("发布失败: %s", e)
return f"❌ 发布异常: {e}"
# ==================================================
# Tab 2: 热点探测
# ==================================================
def search_hotspots(keyword, sort_by, mcp_url):
"""搜索小红书热门内容"""
if not keyword:
return "❌ 请输入搜索关键词", ""
try:
client = get_mcp_client(mcp_url)
result = client.search_feeds(keyword, sort_by=sort_by)
if "error" in result:
return f"❌ 搜索失败: {result['error']}", ""
text = result.get("text", "无结果")
return "✅ 搜索完成", text
except Exception as e:
logger.error("热点搜索失败: %s", e)
return f"❌ 搜索失败: {e}", ""
def analyze_and_suggest(model, keyword, search_result):
"""AI 分析热点并给出建议"""
if not search_result:
return "❌ 请先搜索", "", ""
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ 请先配置 LLM 提供商", "", ""
try:
svc = LLMService(api_key, base_url, model)
analysis = svc.analyze_hotspots(search_result)
topics = "\n".join(f"{t}" for t in analysis.get("hot_topics", []))
patterns = "\n".join(f"{p}" for p in analysis.get("title_patterns", []))
suggestions = "\n".join(
f"**{s['topic']}** - {s['reason']}"
for s in analysis.get("suggestions", [])
)
structure = analysis.get("content_structure", "")
summary = (
f"## 🔥 热门选题\n{topics}\n\n"
f"## 📝 标题套路\n{patterns}\n\n"
f"## 📐 内容结构\n{structure}\n\n"
f"## 💡 推荐选题\n{suggestions}"
)
return "✅ 分析完成", summary, keyword
except Exception as e:
logger.error("热点分析失败: %s", e)
return f"❌ 分析失败: {e}", "", ""
def generate_from_hotspot(model, topic_from_hotspot, style, search_result):
"""基于热点分析生成文案"""
if not topic_from_hotspot:
return "", "", "", "", "❌ 请先选择或输入选题"
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "", "", "", "", "❌ 请先配置 LLM 提供商"
try:
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy_with_reference(
topic=topic_from_hotspot,
style=style,
reference_notes=search_result[:2000], # 截断防止超长
)
tags = data.get("tags", [])
return (
data.get("title", ""),
data.get("content", ""),
data.get("sd_prompt", ""),
", ".join(tags),
"✅ 基于热点的文案已生成",
)
except Exception as e:
return "", "", "", "", f"❌ 生成失败: {e}"
# ==================================================
# Tab 3: 评论管家
# ==================================================
# ---- 共用: 笔记列表缓存 ----
# 主动评论缓存
_cached_proactive_entries: list[dict] = []
# 我的笔记评论缓存
_cached_my_note_entries: list[dict] = []
def _fetch_and_cache(keyword, mcp_url, cache_name="proactive"):
"""通用: 获取笔记列表并缓存"""
global _cached_proactive_entries, _cached_my_note_entries
try:
client = get_mcp_client(mcp_url)
if keyword and keyword.strip():
entries = client.search_feeds_parsed(keyword.strip())
src = f"搜索「{keyword.strip()}"
else:
entries = client.list_feeds_parsed()
src = "首页推荐"
if cache_name == "proactive":
_cached_proactive_entries = entries
else:
_cached_my_note_entries = entries
if not entries:
return gr.update(choices=[], value=None), f"⚠️ 从{src}未找到笔记"
choices = []
for i, e in enumerate(entries):
title_short = (e["title"] or "无标题")[:28]
label = f"[{i+1}] {title_short} | @{e['author'] or '未知'} | ❤ {e['likes']}"
choices.append(label)
return (
gr.update(choices=choices, value=choices[0]),
f"✅ 从{src}获取 {len(entries)} 条笔记",
)
except Exception as e:
if cache_name == "proactive":
_cached_proactive_entries = []
else:
_cached_my_note_entries = []
return gr.update(choices=[], value=None), f"{e}"
def _pick_from_cache(selected, cache_name="proactive"):
"""通用: 从缓存中提取选中条目的 feed_id / xsec_token / title"""
cache = _cached_proactive_entries if cache_name == "proactive" else _cached_my_note_entries
if not selected or not cache:
return "", "", ""
try:
# 尝试从 [N] 前缀提取序号
idx = int(selected.split("]")[0].replace("[", "")) - 1
if 0 <= idx < len(cache):
e = cache[idx]
return e["feed_id"], e["xsec_token"], e.get("title", "")
except (ValueError, IndexError):
pass
# 回退: 模糊匹配标题
for e in cache:
if e.get("title", "")[:15] in selected:
return e["feed_id"], e["xsec_token"], e.get("title", "")
return "", "", ""
# ---- 模块 A: 主动评论他人 ----
def fetch_proactive_notes(keyword, mcp_url):
return _fetch_and_cache(keyword, mcp_url, "proactive")
def on_proactive_note_selected(selected):
return _pick_from_cache(selected, "proactive")
def load_note_for_comment(feed_id, xsec_token, mcp_url):
"""加载目标笔记详情 (标题+正文+已有评论), 用于 AI 分析"""
if not feed_id or not xsec_token:
return "❌ 请先选择笔记", "", "", ""
try:
client = get_mcp_client(mcp_url)
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
return f"{result['error']}", "", "", ""
full_text = result.get("text", "")
# 尝试分离正文和评论
if "评论" in full_text:
parts = full_text.split("评论", 1)
content_part = parts[0].strip()
comments_part = "评论" + parts[1] if len(parts) > 1 else ""
else:
content_part = full_text[:500]
comments_part = ""
return "✅ 笔记内容已加载", content_part[:800], comments_part[:1500], full_text
except Exception as e:
return f"{e}", "", "", ""
def ai_generate_comment(model, persona,
post_title, post_content, existing_comments):
"""AI 生成主动评论"""
persona = _resolve_persona(persona)
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
if not model:
return "⚠️ 请先连接 LLM", "❌ 未选模型"
if not post_title and not post_content:
return "⚠️ 请先加载笔记内容", "❌ 无笔记内容"
try:
svc = LLMService(api_key, base_url, model)
comment = svc.generate_proactive_comment(
persona, post_title, post_content[:600], existing_comments[:800]
)
return comment, "✅ 评论已生成"
except Exception as e:
logger.error(f"AI 评论生成失败: {e}")
return f"生成失败: {e}", f"{e}"
def send_comment(feed_id, xsec_token, comment_content, mcp_url):
"""发送评论到别人的笔记"""
if not all([feed_id, xsec_token, comment_content]):
return "❌ 缺少必要参数 (笔记ID / token / 评论内容)"
try:
client = get_mcp_client(mcp_url)
result = client.post_comment(feed_id, xsec_token, comment_content)
if "error" in result:
return f"{result['error']}"
return "✅ 评论已发送!"
except Exception as e:
return f"{e}"
# ---- 模块 B: 回复我的笔记评论 ----
def fetch_my_notes(mcp_url):
"""通过已保存的 userId 获取我的笔记列表"""
global _cached_my_note_entries
my_uid = cfg.get("my_user_id", "")
xsec = cfg.get("xsec_token", "")
if not my_uid:
return (
gr.update(choices=[], value=None),
"❌ 未配置用户 ID请先到「账号登录」页填写并保存",
)
if not xsec:
return (
gr.update(choices=[], value=None),
"❌ 未获取 xsec_token请先登录",
)
try:
client = get_mcp_client(mcp_url)
result = client.get_user_profile(my_uid, xsec)
if "error" in result:
return gr.update(choices=[], value=None), f"{result['error']}"
# 从 raw 中解析 feeds
raw = result.get("raw", {})
text = result.get("text", "")
data = None
if raw and isinstance(raw, dict):
for item in raw.get("content", []):
if item.get("type") == "text":
try:
data = json.loads(item["text"])
except (json.JSONDecodeError, KeyError):
pass
if not data:
try:
data = json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
feeds = (data or {}).get("feeds") or []
if not feeds:
return (
gr.update(choices=[], value=None),
"⚠️ 未找到你的笔记,可能账号还没有发布内容",
)
entries = []
for f in feeds:
nc = f.get("noteCard") or {}
user = nc.get("user") or {}
interact = nc.get("interactInfo") or {}
entries.append({
"feed_id": f.get("id", ""),
"xsec_token": f.get("xsecToken", ""),
"title": nc.get("displayTitle", "未知标题"),
"author": user.get("nickname", user.get("nickName", "")),
"user_id": user.get("userId", ""),
"likes": interact.get("likedCount", "0"),
"type": nc.get("type", ""),
})
_cached_my_note_entries = entries
choices = [
f"[{i+1}] {e['title'][:20]} | {e['type']} | ❤{e['likes']}"
for i, e in enumerate(entries)
]
return (
gr.update(choices=choices, value=choices[0] if choices else None),
f"✅ 找到 {len(entries)} 篇笔记",
)
except Exception as e:
return gr.update(choices=[], value=None), f"{e}"
def on_my_note_selected(selected):
return _pick_from_cache(selected, "my_notes")
def fetch_my_note_comments(feed_id, xsec_token, mcp_url):
"""获取我的笔记的评论列表"""
if not feed_id or not xsec_token:
return "❌ 请先选择笔记", ""
try:
client = get_mcp_client(mcp_url)
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
return f"{result['error']}", ""
return "✅ 评论加载完成", result.get("text", "暂无评论")
except Exception as e:
return f"{e}", ""
def ai_reply_comment(model, persona, post_title, comment_text):
"""AI 生成评论回复"""
persona = _resolve_persona(persona)
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
if not model:
return "⚠️ 请先连接 LLM 并选择模型", "❌ 未选择模型"
if not comment_text:
return "请输入需要回复的评论内容", "⚠️ 请输入评论"
try:
svc = LLMService(api_key, base_url, model)
reply = svc.generate_reply(persona, post_title, comment_text)
return reply, "✅ 回复已生成"
except Exception as e:
logger.error(f"AI 回复生成失败: {e}")
return f"生成失败: {e}", f"{e}"
def send_reply(feed_id, xsec_token, reply_content, mcp_url):
"""发送评论回复"""
if not all([feed_id, xsec_token, reply_content]):
return "❌ 缺少必要参数"
try:
client = get_mcp_client(mcp_url)
result = client.post_comment(feed_id, xsec_token, reply_content)
if "error" in result:
return f"❌ 回复失败: {result['error']}"
return "✅ 回复已发送"
except Exception as e:
return f"❌ 发送失败: {e}"
# ==================================================
# Tab 4: 数据看板 (我的账号)
# ==================================================
def _parse_profile_json(text: str):
"""尝试从文本中解析用户 profile JSON"""
if not text:
return None
# 直接 JSON
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
# 可能包含 Markdown 代码块
m = re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', text)
if m:
try:
return json.loads(m.group(1))
except (json.JSONDecodeError, TypeError):
pass
return None
def _parse_count(val) -> float:
"""解析数字字符串, 支持 '1.2万' 格式"""
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip()
if "" in s:
try:
return float(s.replace("", "")) * 10000
except ValueError:
pass
try:
return float(s)
except ValueError:
return 0.0
def fetch_my_profile(user_id, xsec_token, mcp_url):
"""获取我的账号数据, 返回结构化信息 + 可视化图表"""
if not user_id or not xsec_token:
return "❌ 请填写你的用户 ID 和 xsec_token", "", None, None, None
try:
client = get_mcp_client(mcp_url)
result = client.get_user_profile(user_id, xsec_token)
if "error" in result:
return f"{result['error']}", "", None, None, None
raw = result.get("raw", {})
text = result.get("text", "")
# 尝试从 raw 或 text 解析 JSON
data = None
if raw and isinstance(raw, dict):
content_list = raw.get("content", [])
for item in content_list:
if item.get("type") == "text":
data = _parse_profile_json(item.get("text", ""))
if data:
break
if not data:
data = _parse_profile_json(text)
if not data:
return "✅ 数据加载完成 (纯文本)", text, None, None, None
# ---- 提取基本信息 (注意 MCP 对新号可能返回 null) ----
basic = data.get("userBasicInfo") or {}
interactions = data.get("interactions") or []
feeds = data.get("feeds") or []
gender_map = {0: "未知", 1: "", 2: ""}
info_lines = [
f"## 👤 {basic.get('nickname', '未知')}",
f"- **小红书号**: {basic.get('redId', '-')}",
f"- **性别**: {gender_map.get(basic.get('gender', 0), '未知')}",
f"- **IP 属地**: {basic.get('ipLocation', '-')}",
f"- **简介**: {basic.get('desc', '-')}",
"",
"### 📊 核心数据",
]
for inter in interactions:
info_lines.append(f"- **{inter.get('name', '')}**: {inter.get('count', '0')}")
info_lines.append(f"\n### 📝 展示笔记: {len(feeds)}")
profile_md = "\n".join(info_lines)
# ---- 互动数据柱状图 ----
fig_interact = None
if interactions:
inter_data = {i["name"]: _parse_count(i["count"]) for i in interactions}
fig_interact, ax = plt.subplots(figsize=(4, 3), dpi=100)
labels = list(inter_data.keys())
values = list(inter_data.values())
colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(labels)]
ax.bar(labels, values, color=colors, edgecolor="white", linewidth=0.5)
ax.set_title("账号核心指标", fontsize=12, fontweight="bold")
for i, v in enumerate(values):
display = f"{v/10000:.1f}" if v >= 10000 else str(int(v))
ax.text(i, v + max(values) * 0.02, display, ha="center", fontsize=9)
ax.set_ylabel("")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
fig_interact.tight_layout()
# ---- 笔记点赞分布图 ----
fig_notes = None
if feeds:
titles, likes = [], []
for f in feeds[:15]:
nc = f.get("noteCard") or {}
t = (nc.get("displayTitle", "") or "无标题")[:12]
lk = _parse_count((nc.get("interactInfo") or {}).get("likedCount", "0"))
titles.append(t)
likes.append(lk)
fig_notes, ax2 = plt.subplots(figsize=(7, 3.5), dpi=100)
ax2.barh(range(len(titles)), likes, color="#FF6B6B", edgecolor="white")
ax2.set_yticks(range(len(titles)))
ax2.set_yticklabels(titles, fontsize=8)
ax2.set_title(f"笔记点赞排行 (Top {len(titles)})", fontsize=12, fontweight="bold")
ax2.invert_yaxis()
for i, v in enumerate(likes):
display = f"{v/10000:.1f}" if v >= 10000 else str(int(v))
ax2.text(v + max(likes) * 0.01 if max(likes) > 0 else 0, i, display, va="center", fontsize=8)
ax2.spines["top"].set_visible(False)
ax2.spines["right"].set_visible(False)
fig_notes.tight_layout()
# ---- 笔记详情表格 (Markdown) ----
table_lines = [
"### 📋 笔记数据明细",
"| # | 标题 | 类型 | ❤ 点赞 |",
"|---|------|------|--------|",
]
for i, f in enumerate(feeds):
nc = f.get("noteCard") or {}
t = (nc.get("displayTitle", "") or "无标题")[:25]
tp = "📹 视频" if nc.get("type") == "video" else "📷 图文"
lk = (nc.get("interactInfo") or {}).get("likedCount", "0")
table_lines.append(f"| {i+1} | {t} | {tp} | {lk} |")
notes_table = "\n".join(table_lines)
return "✅ 数据加载完成", profile_md, fig_interact, fig_notes, notes_table
except Exception as e:
logger.error(f"获取我的数据失败: {e}")
return f"{e}", "", None, None, None
# ==================================================
# 自动化运营模块
# ==================================================
# 自动化状态
_auto_running = threading.Event()
_auto_thread: threading.Thread | None = None
_auto_log: list[str] = []
# ---- 操作记录:防重复 & 每日统计 ----
_op_history = {
"commented_feeds": set(), # 已评论的 feed_id
"replied_comments": set(), # 已回复的 comment_id
"liked_feeds": set(), # 已点赞的 feed_id
"favorited_feeds": set(), # 已收藏的 feed_id
}
_daily_stats = {
"date": "",
"comments": 0,
"likes": 0,
"favorites": 0,
"publishes": 0,
"replies": 0,
"errors": 0,
}
# 每日操作上限
DAILY_LIMITS = {
"comments": 30,
"likes": 80,
"favorites": 50,
"publishes": 8,
"replies": 40,
}
# 连续错误计数 → 冷却
_consecutive_errors = 0
_error_cooldown_until = 0.0
def _reset_daily_stats_if_needed():
"""每天自动重置统计"""
today = datetime.now().strftime("%Y-%m-%d")
if _daily_stats["date"] != today:
_daily_stats.update({
"date": today, "comments": 0, "likes": 0,
"favorites": 0, "publishes": 0, "replies": 0, "errors": 0,
})
# 每日重置历史记录(允许隔天重复互动)
for k in _op_history:
_op_history[k].clear()
def _check_daily_limit(op_type: str) -> bool:
"""检查是否超出每日限额"""
_reset_daily_stats_if_needed()
limit = DAILY_LIMITS.get(op_type, 999)
current = _daily_stats.get(op_type, 0)
return current < limit
def _increment_stat(op_type: str):
"""增加操作计数"""
_reset_daily_stats_if_needed()
_daily_stats[op_type] = _daily_stats.get(op_type, 0) + 1
def _record_error():
"""记录错误,连续错误触发冷却"""
global _consecutive_errors, _error_cooldown_until
_consecutive_errors += 1
_daily_stats["errors"] = _daily_stats.get("errors", 0) + 1
if _consecutive_errors >= 3:
cooldown = min(60 * _consecutive_errors, 600) # 最多冷却10分钟
_error_cooldown_until = time.time() + cooldown
_auto_log_append(f"⚠️ 连续 {_consecutive_errors} 次错误,冷却 {cooldown}s")
def _clear_error_streak():
"""操作成功后清除连续错误记录"""
global _consecutive_errors
_consecutive_errors = 0
def _is_in_cooldown() -> bool:
"""检查是否在错误冷却期"""
return time.time() < _error_cooldown_until
def _is_in_operating_hours(start_hour: int = 7, end_hour: int = 23) -> bool:
"""检查是否在运营时间段"""
now_hour = datetime.now().hour
return start_hour <= now_hour < end_hour
def _get_stats_summary() -> str:
"""获取今日运营统计摘要"""
_reset_daily_stats_if_needed()
s = _daily_stats
lines = [
f"📊 **今日运营统计** ({s['date']})",
f"- 💬 评论: {s['comments']}/{DAILY_LIMITS['comments']}",
f"- ❤️ 点赞: {s['likes']}/{DAILY_LIMITS['likes']}",
f"- ⭐ 收藏: {s['favorites']}/{DAILY_LIMITS['favorites']}",
f"- 🚀 发布: {s['publishes']}/{DAILY_LIMITS['publishes']}",
f"- 💌 回复: {s['replies']}/{DAILY_LIMITS['replies']}",
f"- ❌ 错误: {s['errors']}",
]
return "\n".join(lines)
# ================= 人设池 =================
DEFAULT_PERSONAS = [
"温柔知性的时尚博主,喜欢分享日常穿搭和生活美学",
"元气满满的大学生,热爱探店和平价好物分享",
"30岁都市白领丽人专注通勤穿搭和职场干货",
"精致妈妈,分享育儿经验和家居收纳技巧",
"文艺青年摄影师,喜欢记录旅行和城市角落",
"健身达人营养师,专注减脂餐和运动分享",
"资深美妆博主,擅长化妆教程和护肤测评",
"独居女孩,分享租房改造和独居生活仪式感",
"甜品烘焙爱好者,热衷分享自制甜点和下午茶",
"数码科技女生专注好用App和电子产品测评",
"小镇姑娘在大城市打拼,分享省钱攻略和成长日记",
"中医养生爱好者,分享节气养生和食疗方子",
"二次元coser喜欢分享cos日常和动漫周边",
"北漂程序媛,分享高效工作法和解压生活",
"复古穿搭博主热爱vintage风和中古饰品",
"考研上岸学姐,分享学习方法和备考经验",
"新手养猫人,记录和毛孩子的日常生活",
"咖啡重度爱好者,探遍城市独立咖啡馆",
"极简主义生活家,倡导断舍离和高质量生活",
"汉服爱好者,分享传统文化和国风穿搭",
"插画师小姐姐,分享手绘过程和创作灵感",
"海归女孩,分享中西文化差异和海外生活见闻",
"瑜伽老师,分享身心灵修行和自律生活",
"美甲设计师,分享流行甲型和美甲教程",
"家居软装设计师,分享小户型改造和氛围感布置",
]
RANDOM_PERSONA_LABEL = "🎲 随机人设(每次自动切换)"
# ================= 主题池 =================
DEFAULT_TOPICS = [
# 穿搭类
"春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "小个子穿搭",
"学生党穿搭", "韩系穿搭", "日系穿搭", "法式穿搭", "极简穿搭",
"国风穿搭", "运动穿搭", "闺蜜穿搭", "梨形身材穿搭", "微胖穿搭",
"氛围感穿搭", "一衣多穿", "秋冬叠穿", "夏日清凉穿搭",
# 美妆护肤类
"护肤心得", "妆容教程", "学生党平价护肤", "敏感肌护肤",
"抗老护肤", "美白攻略", "眼妆教程", "唇妆合集", "底妆测评",
"防晒测评", "早C晚A护肤", "成分党护肤", "换季护肤",
# 美食类
"减脂餐分享", "一人食食谱", "宿舍美食", "烘焙教程", "家常菜做法",
"探店打卡", "咖啡探店", "早餐食谱", "下午茶推荐", "火锅推荐",
"奶茶测评", "便当制作", "0失败甜品",
# 生活家居类
"好物分享", "平价好物", "居家好物", "收纳技巧", "租房改造",
"小户型装修", "氛围感房间", "香薰推荐", "桌面布置", "断舍离",
# 旅行出行类
"旅行攻略", "周末去哪玩", "小众旅行地", "拍照打卡地", "露营攻略",
"自驾游攻略", "古镇旅行", "海岛度假", "城市citywalk",
# 学习成长类
"书单推荐", "自律生活", "时间管理", "考研经验", "英语学习方法",
"理财入门", "副业分享", "简历优化", "面试技巧",
# 数码科技类
"iPad生产力", "手机摄影技巧", "好用App推荐", "电子产品测评",
# 健身运动类
"居家健身", "帕梅拉跟练", "跑步入门", "瑜伽入门", "体态矫正",
# 宠物类
"养猫日常", "养狗经验", "宠物好物", "新手养宠指南",
# 情感心理类
"独居生活", "emo急救指南", "社恐自救", "女性成长", "情绪管理",
]
DEFAULT_STYLES = [
"好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷",
"知识科普", "经验分享", "清单合集", "对比测评", "沉浸式体验",
]
# ================= 评论关键词池 =================
DEFAULT_COMMENT_KEYWORDS = [
# 穿搭时尚
"穿搭", "ootd", "早春穿搭", "通勤穿搭", "显瘦", "小个子穿搭",
# 美妆护肤
"护肤", "化妆教程", "平价护肤", "防晒", "美白", "眼影",
# 美食
"美食", "减脂餐", "探店", "咖啡", "烘焙", "食谱",
# 生活好物
"好物推荐", "平价好物", "居家好物", "收纳", "租房改造",
# 旅行
"旅行", "攻略", "打卡", "周末去哪玩", "露营",
# 学习成长
"自律", "书单", "考研", "英语学习", "副业",
# 生活日常
"生活日常", "独居", "vlog", "仪式感", "解压",
# 健身
"减脂", "健身", "瑜伽", "体态",
# 宠物
"养猫", "养狗", "宠物",
]
def _auto_log_append(msg: str):
"""记录自动化日志"""
ts = datetime.now().strftime("%H:%M:%S")
entry = f"[{ts}] {msg}"
_auto_log.append(entry)
if len(_auto_log) > 500:
_auto_log[:] = _auto_log[-300:]
logger.info("[自动化] %s", msg)
def _resolve_persona(persona_text: str) -> str:
"""解析人设:如果是随机人设则从池中随机选一个,否则原样返回"""
if not persona_text or persona_text == RANDOM_PERSONA_LABEL:
chosen = random.choice(DEFAULT_PERSONAS)
_auto_log_append(f"🎭 本次人设: {chosen[:20]}...")
return chosen
# 检查是否选的是池中某个人设Dropdown选中
return persona_text
def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text):
"""一键评论 + 同步刷新日志"""
msg = auto_comment_once(keywords_str, mcp_url, model, persona_text)
return msg, get_auto_log()
def auto_comment_once(keywords_str, mcp_url, model, persona_text):
"""一键评论:自动搜索高赞笔记 → AI生成评论 → 发送(含防重复/限额/冷却)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("comments"):
return f"🚫 今日评论已达上限 ({DAILY_LIMITS['comments']})"
persona_text = _resolve_persona(persona_text)
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
_auto_log_append(f"🔍 搜索关键词: {keyword}")
client = get_mcp_client(mcp_url)
# 随机切换搜索排序,丰富互动对象
sort_options = ["最多点赞", "综合", "最新"]
sort_by = random.choice(sort_options)
# 搜索高赞笔记
entries = client.search_feeds_parsed(keyword, sort_by=sort_by)
if not entries:
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
# 过滤掉自己的笔记 & 已评论过的笔记
my_uid = cfg.get("my_user_id", "")
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["commented_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已评论过,换个关键词试试"
# 从前10个中随机选择
target = random.choice(entries[:min(10, len(entries))])
feed_id = target["feed_id"]
xsec_token = target["xsec_token"]
title = target.get("title", "未知")
_auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')}) [排序:{sort_by}]")
if not feed_id or not xsec_token:
return "❌ 笔记缺少必要参数 (feed_id/xsec_token)"
# 模拟浏览延迟
time.sleep(random.uniform(3, 8))
# 加载笔记详情
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
_record_error()
return f"❌ 加载笔记失败: {result['error']}"
full_text = result.get("text", "")
if "评论" in full_text:
parts = full_text.split("评论", 1)
content_part = parts[0].strip()[:600]
comments_part = ("评论" + parts[1])[:800] if len(parts) > 1 else ""
else:
content_part = full_text[:500]
comments_part = ""
# AI 生成评论
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
svc = LLMService(api_key, base_url, model)
comment = svc.generate_proactive_comment(
persona_text, title, content_part, comments_part
)
_auto_log_append(f"💬 生成评论: {comment[:60]}...")
# 随机等待后发送
time.sleep(random.uniform(3, 10))
result = client.post_comment(feed_id, xsec_token, comment)
resp_text = result.get("text", "")
_auto_log_append(f"📡 MCP 响应: {resp_text[:200]}")
if "error" in result:
_record_error()
_auto_log_append(f"❌ 评论发送失败: {result['error']}")
return f"❌ 评论发送失败: {result['error']}"
# 记录成功操作
_op_history["commented_feeds"].add(feed_id)
_increment_stat("comments")
_clear_error_streak()
_auto_log_append(f"✅ 评论已发送到「{title[:20]}」 (今日第{_daily_stats['comments']}条)")
return f"✅ 已评论「{title[:25]}\n📝 评论: {comment}\n📊 今日评论: {_daily_stats['comments']}/{DAILY_LIMITS['comments']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键评论异常: {e}")
return f"❌ 评论失败: {e}"
def _auto_like_with_log(keywords_str, like_count, mcp_url):
"""一键点赞 + 同步刷新日志"""
msg = auto_like_once(keywords_str, like_count, mcp_url)
return msg, get_auto_log()
def auto_like_once(keywords_str, like_count, mcp_url):
"""一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("likes"):
return f"🚫 今日点赞已达上限 ({DAILY_LIMITS['likes']})"
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
like_count = int(like_count) if like_count else 5
# 不超过当日剩余额度
remaining = DAILY_LIMITS["likes"] - _daily_stats.get("likes", 0)
like_count = min(like_count, remaining)
_auto_log_append(f"👍 点赞关键词: {keyword} | 目标: {like_count}")
client = get_mcp_client(mcp_url)
# 搜索笔记
entries = client.search_feeds_parsed(keyword, sort_by="综合")
if not entries:
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
# 过滤自己的笔记 & 已点赞过的
my_uid = cfg.get("my_user_id", "")
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["liked_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已点赞过"
# 随机打乱,取前 N 个
random.shuffle(entries)
targets = entries[:min(like_count, len(entries))]
liked = 0
for target in targets:
feed_id = target.get("feed_id", "")
xsec_token = target.get("xsec_token", "")
title = target.get("title", "未知")[:25]
if not feed_id or not xsec_token:
continue
# 模拟浏览延迟
time.sleep(random.uniform(2, 6))
result = client.like_feed(feed_id, xsec_token)
if "error" in result:
_auto_log_append(f" ❌ 点赞失败「{title}」: {result['error']}")
else:
liked += 1
_op_history["liked_feeds"].add(feed_id)
_increment_stat("likes")
_auto_log_append(f" ❤️ 已点赞「{title}」@{target.get('author', '未知')}")
if liked > 0:
_clear_error_streak()
_auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)} (今日累计{_daily_stats.get('likes', 0)})")
return f"✅ 点赞完成!成功 {liked}/{len(targets)}\n📊 今日点赞: {_daily_stats.get('likes', 0)}/{DAILY_LIMITS['likes']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键点赞异常: {e}")
return f"❌ 点赞失败: {e}"
def _auto_favorite_with_log(keywords_str, fav_count, mcp_url):
"""一键收藏 + 同步刷新日志"""
msg = auto_favorite_once(keywords_str, fav_count, mcp_url)
return msg, get_auto_log()
def auto_favorite_once(keywords_str, fav_count, mcp_url):
"""一键收藏:搜索优质笔记 → 随机选择 → 批量收藏(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("favorites"):
return f"🚫 今日收藏已达上限 ({DAILY_LIMITS['favorites']})"
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
fav_count = int(fav_count) if fav_count else 3
remaining = DAILY_LIMITS["favorites"] - _daily_stats.get("favorites", 0)
fav_count = min(fav_count, remaining)
_auto_log_append(f"⭐ 收藏关键词: {keyword} | 目标: {fav_count}")
client = get_mcp_client(mcp_url)
entries = client.search_feeds_parsed(keyword, sort_by="最多收藏")
if not entries:
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
my_uid = cfg.get("my_user_id", "")
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["favorited_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已收藏过"
random.shuffle(entries)
targets = entries[:min(fav_count, len(entries))]
saved = 0
for target in targets:
feed_id = target.get("feed_id", "")
xsec_token = target.get("xsec_token", "")
title = target.get("title", "未知")[:25]
if not feed_id or not xsec_token:
continue
time.sleep(random.uniform(2, 6))
result = client.favorite_feed(feed_id, xsec_token)
if "error" in result:
_auto_log_append(f" ❌ 收藏失败「{title}」: {result['error']}")
else:
saved += 1
_op_history["favorited_feeds"].add(feed_id)
_increment_stat("favorites")
_auto_log_append(f" ⭐ 已收藏「{title}」@{target.get('author', '未知')}")
if saved > 0:
_clear_error_streak()
_auto_log_append(f"⭐ 收藏完成: 成功 {saved}/{len(targets)} (今日累计{_daily_stats.get('favorites', 0)})")
return f"✅ 收藏完成!成功 {saved}/{len(targets)}\n📊 今日收藏: {_daily_stats.get('favorites', 0)}/{DAILY_LIMITS['favorites']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键收藏异常: {e}")
return f"❌ 收藏失败: {e}"
def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布 + 同步刷新日志"""
msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model)
return msg, get_auto_log()
def _auto_reply_with_log(max_replies, mcp_url, model, persona_text):
"""一键回复 + 同步刷新日志"""
msg = auto_reply_once(max_replies, mcp_url, model, persona_text)
return msg, get_auto_log()
def auto_reply_once(max_replies, mcp_url, model, persona_text):
"""一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("replies"):
return f"🚫 今日回复已达上限 ({DAILY_LIMITS['replies']})"
persona_text = _resolve_persona(persona_text)
my_uid = cfg.get("my_user_id", "")
xsec = cfg.get("xsec_token", "")
if not my_uid:
return "❌ 未配置用户 ID请到「账号登录」页填写"
if not xsec:
return "❌ 未获取 xsec_token请先登录"
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置"
max_replies = int(max_replies) if max_replies else 3
remaining = DAILY_LIMITS["replies"] - _daily_stats.get("replies", 0)
max_replies = min(max_replies, remaining)
client = get_mcp_client(mcp_url)
_auto_log_append("💌 开始自动回复评论...")
# Step 1: 获取我的笔记列表
result = client.get_user_profile(my_uid, xsec)
if "error" in result:
_auto_log_append(f"❌ 获取我的笔记失败: {result['error']}")
return f"❌ 获取我的笔记失败: {result['error']}"
# 解析笔记列表
raw = result.get("raw", {})
text = result.get("text", "")
data = None
if raw and isinstance(raw, dict):
for item in raw.get("content", []):
if item.get("type") == "text":
try:
data = json.loads(item["text"])
except (json.JSONDecodeError, KeyError):
pass
if not data:
try:
data = json.loads(text)
except (json.JSONDecodeError, TypeError):
pass
feeds = (data or {}).get("feeds") or []
if not feeds:
_auto_log_append("⚠️ 未找到任何笔记")
return "⚠️ 未找到你的笔记"
# 构建笔记条目
my_entries = []
for f in feeds:
nc = f.get("noteCard") or {}
my_entries.append({
"feed_id": f.get("id", ""),
"xsec_token": f.get("xsecToken", ""),
"title": nc.get("displayTitle", "未知标题"),
})
_auto_log_append(f"📝 找到 {len(my_entries)} 篇笔记,开始扫描评论...")
# Step 2: 遍历笔记,找到未回复的评论
total_replied = 0
svc = LLMService(api_key, base_url, model)
for entry in my_entries:
if total_replied >= max_replies:
break
feed_id = entry["feed_id"]
xsec_token = entry["xsec_token"]
title = entry["title"]
if not feed_id or not xsec_token:
continue
time.sleep(random.uniform(1, 3))
# 加载笔记评论(使用结构化接口)
comments = client.get_feed_comments(feed_id, xsec_token, load_all=True)
if not comments:
continue
# 过滤掉自己的评论 & 已回复过的评论
other_comments = [
c for c in comments
if c.get("user_id") and c["user_id"] != my_uid and c.get("content")
and c.get("comment_id", "") not in _op_history["replied_comments"]
]
if not other_comments:
continue
_auto_log_append(f"📖「{title[:20]}」有 {len(other_comments)} 条他人评论")
for comment in other_comments:
if total_replied >= max_replies:
break
comment_id = comment.get("comment_id", "")
comment_uid = comment.get("user_id", "")
comment_text = comment.get("content", "")
nickname = comment.get("nickname", "网友")
if not comment_text.strip():
continue
_auto_log_append(f" 💬 @{nickname}: {comment_text[:40]}...")
# AI 生成回复
try:
reply = svc.generate_reply(persona_text, title, comment_text)
except Exception as e:
_auto_log_append(f" ❌ AI 回复生成失败: {e}")
continue
_auto_log_append(f" 🤖 回复: {reply[:50]}...")
# 发送回复
time.sleep(random.uniform(2, 6))
if comment_id and comment_uid:
# 使用 reply_comment 精确回复
resp = client.reply_comment(
feed_id, xsec_token, comment_id, comment_uid, reply
)
else:
# 没有 comment_id 就用 post_comment 发到笔记下
resp = client.post_comment(feed_id, xsec_token, f"@{nickname} {reply}")
resp_text = resp.get("text", "")
if "error" in resp:
_auto_log_append(f" ❌ 回复发送失败: {resp['error']}")
else:
_auto_log_append(f" ✅ 已回复 @{nickname}")
total_replied += 1
if comment_id:
_op_history["replied_comments"].add(comment_id)
_increment_stat("replies")
if total_replied > 0:
_clear_error_streak()
if total_replied == 0:
_auto_log_append(" 没有找到需要回复的新评论")
return " 没有找到需要回复的新评论\n\n💡 可能所有评论都已回复过"
else:
_auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} 条 (今日累计{_daily_stats.get('replies', 0)})")
return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n📊 今日回复: {_daily_stats.get('replies', 0)}/{DAILY_LIMITS['replies']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 自动回复异常: {e}")
return f"❌ 自动回复失败: {e}"
def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布:自动生成文案 → 生成图片 → 本地备份 → 发布到小红书(含限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("publishes"):
return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})"
topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS
topic = random.choice(topics)
style = random.choice(DEFAULT_STYLES)
_auto_log_append(f"📝 主题: {topic} | 风格: {style}")
# 生成文案
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style)
title = (data.get("title", "") or "")[:20]
content = data.get("content", "")
sd_prompt = data.get("sd_prompt", "")
tags = data.get("tags", [])
if not title:
_record_error()
return "❌ 文案生成失败:无标题"
_auto_log_append(f"📄 文案: {title}")
# 生成图片
if not sd_url_val or not sd_model_name:
return "❌ SD WebUI 未连接或未选择模型,请先在全局设置中连接"
sd_svc = SDService(sd_url_val)
images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name)
if not images:
_record_error()
return "❌ 图片生成失败:没有返回图片"
_auto_log_append(f"🎨 已生成 {len(images)} 张图片")
# 本地备份(同时用于发布)
ts = int(time.time())
safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20]
backup_dir = os.path.join(OUTPUT_DIR, f"{ts}_{safe_title}")
os.makedirs(backup_dir, exist_ok=True)
# 保存文案
with open(os.path.join(backup_dir, "文案.txt"), "w", encoding="utf-8") as f:
f.write(f"标题: {title}\n风格: {style}\n主题: {topic}\n\n{content}\n\n标签: {', '.join(tags)}\n\nSD Prompt: {sd_prompt}")
image_paths = []
for idx, img in enumerate(images):
if isinstance(img, Image.Image):
path = os.path.abspath(os.path.join(backup_dir, f"{idx+1}.png"))
img.save(path)
image_paths.append(path)
if not image_paths:
return "❌ 图片保存失败"
_auto_log_append(f"💾 本地已备份至: {backup_dir}")
# 发布到小红书
client = get_mcp_client(mcp_url)
result = client.publish_content(
title=title, content=content, images=image_paths, tags=tags
)
if "error" in result:
_record_error()
_auto_log_append(f"❌ 发布失败: {result['error']} (文案已本地保存)")
return f"❌ 发布失败: {result['error']}\n💾 文案和图片已备份至: {backup_dir}"
_increment_stat("publishes")
_clear_error_streak()
# 清理 _temp_publish 中的旧临时文件
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
try:
if os.path.exists(temp_dir):
for f in os.listdir(temp_dir):
fp = os.path.join(temp_dir, f)
if os.path.isfile(fp) and time.time() - os.path.getmtime(fp) > 3600:
os.remove(fp)
except Exception:
pass
_auto_log_append(f"🚀 发布成功: {title} (今日第{_daily_stats['publishes']}篇)")
return f"✅ 发布成功!\n📌 标题: {title}\n💾 备份: {backup_dir}\n📊 今日发布: {_daily_stats['publishes']}/{DAILY_LIMITS['publishes']}\n{result.get('text', '')}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键发布异常: {e}")
return f"❌ 发布失败: {e}"
# 调度器下次执行时间追踪
_scheduler_next_times = {}
def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enabled,
favorite_enabled,
comment_min, comment_max, publish_min, publish_max,
reply_min, reply_max, max_replies_per_run,
like_min, like_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""后台定时调度循环(含运营时段、冷却、收藏、统计)"""
_auto_log_append("🤖 自动化调度器已启动")
_auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00")
# 首次执行的随机延迟
next_comment = time.time() + random.randint(10, 60)
next_publish = time.time() + random.randint(30, 120)
next_reply = time.time() + random.randint(15, 90)
next_like = time.time() + random.randint(5, 40)
next_favorite = time.time() + random.randint(10, 50)
def _update_next_display():
"""更新下次执行时间显示"""
times = {}
if comment_enabled:
times["评论"] = datetime.fromtimestamp(next_comment).strftime("%H:%M:%S")
if like_enabled:
times["点赞"] = datetime.fromtimestamp(next_like).strftime("%H:%M:%S")
if favorite_enabled:
times["收藏"] = datetime.fromtimestamp(next_favorite).strftime("%H:%M:%S")
if reply_enabled:
times["回复"] = datetime.fromtimestamp(next_reply).strftime("%H:%M:%S")
if publish_enabled:
times["发布"] = datetime.fromtimestamp(next_publish).strftime("%H:%M:%S")
_scheduler_next_times.update(times)
_update_next_display()
while _auto_running.is_set():
now = time.time()
# 检查运营时段
if not _is_in_operating_hours(int(op_start_hour), int(op_end_hour)):
now_hour = datetime.now().hour
_auto_log_append(f"😴 当前{now_hour}时,不在运营时段({int(op_start_hour)}-{int(op_end_hour)}),休眠中...")
# 休眠到运营时间开始
for _ in range(300): # 5分钟检查一次
if not _auto_running.is_set():
break
time.sleep(1)
continue
# 检查错误冷却
if _is_in_cooldown():
remain = int(_error_cooldown_until - time.time())
if remain > 0 and remain % 30 == 0:
_auto_log_append(f"⏳ 错误冷却中,剩余 {remain}s")
time.sleep(5)
continue
# 自动评论
if comment_enabled and now >= next_comment:
try:
_auto_log_append("--- 🔄 执行自动评论 ---")
msg = auto_comment_once(keywords, mcp_url, model, persona_text)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动评论异常: {e}")
interval = random.randint(int(comment_min) * 60, int(comment_max) * 60)
next_comment = time.time() + interval
_auto_log_append(f"⏰ 下次评论: {interval // 60} 分钟后")
_update_next_display()
# 自动点赞
if like_enabled and now >= next_like:
try:
_auto_log_append("--- 🔄 执行自动点赞 ---")
msg = auto_like_once(keywords, like_count_per_run, mcp_url)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动点赞异常: {e}")
interval = random.randint(int(like_min) * 60, int(like_max) * 60)
next_like = time.time() + interval
_auto_log_append(f"⏰ 下次点赞: {interval // 60} 分钟后")
_update_next_display()
# 自动收藏
if favorite_enabled and now >= next_favorite:
try:
_auto_log_append("--- 🔄 执行自动收藏 ---")
msg = auto_favorite_once(keywords, fav_count_per_run, mcp_url)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动收藏异常: {e}")
interval = random.randint(int(fav_min) * 60, int(fav_max) * 60)
next_favorite = time.time() + interval
_auto_log_append(f"⏰ 下次收藏: {interval // 60} 分钟后")
_update_next_display()
# 自动发布
if publish_enabled and now >= next_publish:
try:
_auto_log_append("--- 🔄 执行自动发布 ---")
msg = auto_publish_once(topics, mcp_url, sd_url_val, sd_model_name, model)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动发布异常: {e}")
interval = random.randint(int(publish_min) * 60, int(publish_max) * 60)
next_publish = time.time() + interval
_auto_log_append(f"⏰ 下次发布: {interval // 60} 分钟后")
_update_next_display()
# 自动回复评论
if reply_enabled and now >= next_reply:
try:
_auto_log_append("--- 🔄 执行自动回复评论 ---")
msg = auto_reply_once(max_replies_per_run, mcp_url, model, persona_text)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动回复异常: {e}")
interval = random.randint(int(reply_min) * 60, int(reply_max) * 60)
next_reply = time.time() + interval
_auto_log_append(f"⏰ 下次回复: {interval // 60} 分钟后")
_update_next_display()
# 每5秒检查一次停止信号
for _ in range(5):
if not _auto_running.is_set():
break
time.sleep(1)
_scheduler_next_times.clear()
_auto_log_append("🛑 自动化调度器已停止")
def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""启动定时自动化"""
global _auto_thread
if _auto_running.is_set():
return "⚠️ 调度器已在运行中,请先停止"
if not comment_on and not publish_on and not reply_on and not like_on and not favorite_on:
return "❌ 请至少启用一项自动化功能"
# 评论/回复需要 LLM点赞/收藏不需要
if (comment_on or reply_on):
api_key, _, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置,请先在全局设置中配置提供商"
_auto_running.set()
_auto_thread = threading.Thread(
target=_scheduler_loop,
args=(comment_on, publish_on, reply_on, like_on, favorite_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text),
daemon=True,
)
_auto_thread.start()
parts = []
if comment_on:
parts.append(f"评论({int(c_min)}-{int(c_max)}分)")
if like_on:
parts.append(f"点赞({int(l_min)}-{int(l_max)}分, {int(like_count_per_run)}个/轮)")
if favorite_on:
parts.append(f"收藏({int(fav_min)}-{int(fav_max)}分, {int(fav_count_per_run)}个/轮)")
if publish_on:
parts.append(f"发布({int(p_min)}-{int(p_max)}分)")
if reply_on:
parts.append(f"回复({int(r_min)}-{int(r_max)}分, ≤{int(max_replies_per_run)}条/轮)")
_auto_log_append(f"调度器已启动: {' + '.join(parts)}")
return f"✅ 自动化已启动 🟢\n⏰ 运营时段: {int(op_start_hour)}:00-{int(op_end_hour)}:00\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度"
def stop_scheduler():
"""停止定时自动化"""
if not _auto_running.is_set():
return "⚠️ 调度器未在运行"
_auto_running.clear()
_auto_log_append("⏹️ 收到停止信号,等待当前任务完成...")
return "🛑 调度器停止中...当前任务完成后将完全停止"
def get_auto_log():
"""获取自动化运行日志"""
if not _auto_log:
return "📋 暂无日志\n\n💡 点击「一键评论」「一键发布」或启动定时后日志将在此显示"
return "\n".join(_auto_log[-80:])
def get_scheduler_status():
"""获取调度器运行状态 + 下次执行时间 + 今日统计"""
_reset_daily_stats_if_needed()
if _auto_running.is_set():
lines = ["🟢 **调度器运行中**"]
if _scheduler_next_times:
next_info = " | ".join(f"{k}@{v}" for k, v in _scheduler_next_times.items())
lines.append(f"⏰ 下次: {next_info}")
s = _daily_stats
lines.append(
f"📊 今日: 💬{s.get('comments',0)} ❤️{s.get('likes',0)} "
f"{s.get('favorites',0)} 🚀{s.get('publishes',0)} "
f"💌{s.get('replies',0)}{s.get('errors',0)}"
)
if _is_in_cooldown():
lines.append(f"⏳ 冷却中,{int(_error_cooldown_until - time.time())}s 后恢复")
return "\n".join(lines)
return "⚪ **调度器未运行**"
# ==================================================
# Windows 开机自启管理
# ==================================================
_APP_NAME = "XHS_AI_AutoBot"
_STARTUP_REG_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
def _get_startup_script_path() -> str:
"""获取启动脚本路径(.vbs 静默启动,不弹黑窗)"""
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "_autostart.vbs")
def _get_startup_bat_path() -> str:
"""获取启动 bat 路径"""
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "_autostart.bat")
def _create_startup_scripts():
"""创建静默启动脚本bat + vbs"""
app_dir = os.path.dirname(os.path.abspath(__file__))
venv_python = os.path.join(app_dir, ".venv", "Scripts", "pythonw.exe")
# 如果没有 pythonw退回 python.exe
if not os.path.exists(venv_python):
venv_python = os.path.join(app_dir, ".venv", "Scripts", "python.exe")
main_script = os.path.join(app_dir, "main.py")
# 创建 bat
bat_path = _get_startup_bat_path()
bat_content = f"""@echo off
cd /d "{app_dir}"
"{venv_python}" "{main_script}"
"""
with open(bat_path, "w", encoding="utf-8") as f:
f.write(bat_content)
# 创建 vbs静默运行 bat不弹出命令行窗口
vbs_path = _get_startup_script_path()
vbs_content = f"""Set WshShell = CreateObject("WScript.Shell")
WshShell.Run chr(34) & "{bat_path}" & chr(34), 0
Set WshShell = Nothing
"""
with open(vbs_path, "w", encoding="utf-8") as f:
f.write(vbs_content)
return vbs_path
def is_autostart_enabled() -> bool:
"""检查是否已设置开机自启"""
if platform.system() != "Windows":
return False
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_READ)
try:
val, _ = winreg.QueryValueEx(key, _APP_NAME)
winreg.CloseKey(key)
return bool(val)
except FileNotFoundError:
winreg.CloseKey(key)
return False
except Exception:
return False
def enable_autostart() -> str:
"""启用 Windows 开机自启"""
if platform.system() != "Windows":
return "❌ 此功能仅支持 Windows 系统"
try:
import winreg
vbs_path = _create_startup_scripts()
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_SET_VALUE)
# 用 wscript 运行 vbs 以实现静默启动
winreg.SetValueEx(key, _APP_NAME, 0, winreg.REG_SZ, f'wscript.exe "{vbs_path}"')
winreg.CloseKey(key)
logger.info(f"开机自启已启用: {vbs_path}")
return "✅ 开机自启已启用\n下次开机时将自动后台运行本程序"
except Exception as e:
logger.error(f"设置开机自启失败: {e}")
return f"❌ 设置失败: {e}"
def disable_autostart() -> str:
"""禁用 Windows 开机自启"""
if platform.system() != "Windows":
return "❌ 此功能仅支持 Windows 系统"
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_SET_VALUE)
try:
winreg.DeleteValue(key, _APP_NAME)
except FileNotFoundError:
pass
winreg.CloseKey(key)
# 清理启动脚本
for f in [_get_startup_script_path(), _get_startup_bat_path()]:
if os.path.exists(f):
os.remove(f)
logger.info("开机自启已禁用")
return "✅ 开机自启已禁用"
except Exception as e:
logger.error(f"禁用开机自启失败: {e}")
return f"❌ 禁用失败: {e}"
def toggle_autostart(enabled: bool) -> str:
"""切换开机自启状态(供 UI 调用)"""
if enabled:
return enable_autostart()
else:
return disable_autostart()
# ==================================================
# UI 构建
# ==================================================
config = cfg.all
with gr.Blocks(
title="小红书 AI 爆文工坊 V2.0",
theme=gr.themes.Soft(),
css="""
.status-ok { color: #16a34a; font-weight: bold; }
.status-err { color: #dc2626; font-weight: bold; }
footer { display: none !important; }
""",
) as app:
gr.Markdown(
"# 🍒 小红书 AI 爆文生产工坊 V2.0\n"
"> 灵感 → 文案 → 绘图 → 发布 → 运营,一站式全闭环"
)
# 全局状态
state_images = gr.State([])
state_search_result = gr.State("")
# ============ 全局设置栏 ============
with gr.Accordion("⚙️ 全局设置 (自动保存)", open=False):
gr.Markdown("#### 🤖 LLM 提供商 (支持所有 OpenAI 兼容接口)")
with gr.Row():
llm_provider = gr.Dropdown(
label="选择 LLM 提供商",
choices=cfg.get_llm_provider_names(),
value=cfg.get("active_llm", ""),
interactive=True, scale=2,
)
btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm", scale=1)
with gr.Row():
llm_model = gr.Dropdown(
label="LLM 模型", value=config["model"],
allow_custom_value=True, interactive=True, scale=2,
)
llm_provider_info = gr.Markdown(
value="*选择提供商后显示详情*",
)
with gr.Accordion(" 添加 / 管理 LLM 提供商", open=False):
with gr.Row():
new_provider_name = gr.Textbox(
label="名称", placeholder="如: DeepSeek / GPT-4o / 通义千问",
scale=1,
)
new_provider_key = gr.Textbox(
label="API Key", type="password", scale=2,
)
new_provider_url = gr.Textbox(
label="Base URL", placeholder="https://api.openai.com/v1",
value="https://api.openai.com/v1", scale=2,
)
with gr.Row():
btn_add_provider = gr.Button("✅ 添加提供商", variant="primary", size="sm")
btn_del_provider = gr.Button("🗑️ 删除当前提供商", variant="stop", size="sm")
provider_mgmt_status = gr.Markdown("")
gr.Markdown("---")
with gr.Row():
mcp_url = gr.Textbox(
label="MCP Server URL", value=config["mcp_url"], scale=2,
)
sd_url = gr.Textbox(
label="SD WebUI URL", value=config["sd_url"], scale=2,
)
with gr.Row():
persona = gr.Dropdown(
label="博主人设(评论/回复/自动运营通用)",
choices=[RANDOM_PERSONA_LABEL] + DEFAULT_PERSONAS,
value=config.get("persona", RANDOM_PERSONA_LABEL),
allow_custom_value=True,
interactive=True,
scale=5,
)
with gr.Row():
btn_connect_sd = gr.Button("🎨 连接 SD", size="sm")
btn_check_mcp = gr.Button("📡 检查 MCP", size="sm")
with gr.Row():
sd_model = gr.Dropdown(
label="SD 模型", allow_custom_value=True,
interactive=True, scale=2,
)
status_bar = gr.Markdown("🔄 等待连接...")
gr.Markdown("---")
gr.Markdown("#### 🖥️ 系统设置")
with gr.Row():
autostart_toggle = gr.Checkbox(
label="🚀 Windows 开机自启(静默后台运行)",
value=is_autostart_enabled(),
interactive=(platform.system() == "Windows"),
)
autostart_status = gr.Markdown(
value="✅ 已启用" if is_autostart_enabled() else "⚪ 未启用",
)
# ============ Tab 页面 ============
with gr.Tabs():
# -------- Tab 1: 内容创作 --------
with gr.Tab("✨ 内容创作"):
with gr.Row():
# 左栏:输入
with gr.Column(scale=1):
gr.Markdown("### 💡 构思")
topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭")
style = gr.Dropdown(
DEFAULT_STYLES,
label="风格", value="好物种草",
)
btn_gen_copy = gr.Button("✨ 第一步:生成文案", variant="primary")
gr.Markdown("---")
gr.Markdown("### 🎨 绘图参数")
with gr.Accordion("高级设置", open=False):
neg_prompt = gr.Textbox(
label="反向提示词", value=DEFAULT_NEGATIVE, lines=2,
)
steps = gr.Slider(15, 50, value=25, step=1, label="步数")
cfg_scale = gr.Slider(1, 15, value=7, step=0.5, label="CFG Scale")
btn_gen_img = gr.Button("🎨 第二步:生成图片", variant="primary")
# 中栏:文案编辑
with gr.Column(scale=1):
gr.Markdown("### 📝 文案编辑")
res_title = gr.Textbox(label="标题 (≤20字)", interactive=True)
res_content = gr.TextArea(
label="正文 (可手动修改)", lines=12, interactive=True,
)
res_prompt = gr.TextArea(
label="绘图提示词", lines=3, interactive=True,
)
res_tags = gr.Textbox(
label="话题标签 (逗号分隔)", interactive=True,
placeholder="穿搭, 春季, 好物种草",
)
# 右栏:预览 & 发布
with gr.Column(scale=1):
gr.Markdown("### 🖼️ 视觉预览")
gallery = gr.Gallery(label="AI 生成图片", columns=2, height=300)
local_images = gr.File(
label="📁 上传本地图片(可混排)",
file_count="multiple",
file_types=["image"],
)
gr.Markdown("### 🚀 发布")
schedule_time = gr.Textbox(
label="定时发布 (可选, ISO8601格式)",
placeholder="如 2026-02-08T18:00:00+08:00留空=立即发布",
)
with gr.Row():
btn_export = gr.Button("📂 导出本地", variant="secondary")
btn_publish = gr.Button("🚀 发布到小红书", variant="primary")
publish_msg = gr.Markdown("")
# -------- Tab 2: 热点探测 --------
with gr.Tab("🔥 热点探测"):
gr.Markdown("### 搜索热门内容 → AI 分析趋势 → 一键借鉴创作")
with gr.Row():
with gr.Column(scale=1):
hot_keyword = gr.Textbox(
label="搜索关键词", placeholder="如:春季穿搭",
)
hot_sort = gr.Dropdown(
["综合", "最新", "最多点赞", "最多评论", "最多收藏"],
label="排序", value="综合",
)
btn_search = gr.Button("🔍 搜索", variant="primary")
search_status = gr.Markdown("")
with gr.Column(scale=2):
search_output = gr.TextArea(
label="搜索结果", lines=12, interactive=False,
)
with gr.Row():
btn_analyze = gr.Button("🧠 AI 分析热点趋势", variant="primary")
analysis_status = gr.Markdown("")
analysis_output = gr.Markdown(label="分析报告")
topic_from_hot = gr.Textbox(
label="选择/输入创作选题", placeholder="基于分析选一个方向",
)
with gr.Row():
hot_style = gr.Dropdown(
["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"],
label="风格", value="好物种草",
)
btn_gen_from_hot = gr.Button("✨ 基于热点生成文案", variant="primary")
with gr.Row():
hot_title = gr.Textbox(label="生成的标题", interactive=True)
hot_content = gr.TextArea(label="生成的正文", lines=8, interactive=True)
with gr.Row():
hot_prompt = gr.TextArea(label="绘图提示词", lines=3, interactive=True)
hot_tags = gr.Textbox(label="标签", interactive=True)
hot_gen_status = gr.Markdown("")
btn_sync_to_create = gr.Button(
"📋 同步到「内容创作」Tab → 绘图 & 发布",
variant="primary",
)
# -------- Tab 3: 评论管家 --------
with gr.Tab("💬 评论管家"):
gr.Markdown("### 智能评论管理:主动评论引流 & 自动回复粉丝")
with gr.Tabs():
# ======== 子 Tab A: 主动评论他人 ========
with gr.Tab("✍️ 主动评论引流"):
gr.Markdown(
"> **流程**:搜索/浏览笔记 → 选择目标 → 加载内容 → "
"AI 分析笔记+已有评论自动生成高质量评论 → 一键发送"
)
# 笔记选择器
with gr.Row():
pro_keyword = gr.Textbox(
label="🔍 搜索关键词 (留空则获取推荐)",
placeholder="穿搭、美食、旅行…",
)
btn_pro_fetch = gr.Button("🔍 获取笔记", variant="primary")
with gr.Row():
pro_selector = gr.Dropdown(
label="📋 选择目标笔记",
choices=[], interactive=True,
)
pro_fetch_status = gr.Markdown("")
# 隐藏字段
with gr.Row():
pro_feed_id = gr.Textbox(label="笔记 ID", interactive=False)
pro_xsec_token = gr.Textbox(label="xsec_token", interactive=False)
pro_title = gr.Textbox(label="标题", interactive=False)
# 加载内容 & AI 分析
btn_pro_load = gr.Button("📖 加载笔记内容", variant="secondary")
pro_load_status = gr.Markdown("")
with gr.Row():
with gr.Column(scale=1):
pro_content = gr.TextArea(
label="📄 笔记正文摘要", lines=8, interactive=False,
)
with gr.Column(scale=1):
pro_comments = gr.TextArea(
label="💬 已有评论", lines=8, interactive=False,
)
# 隐藏: 完整文本
pro_full_text = gr.Textbox(visible=False)
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=1):
btn_pro_ai = gr.Button(
"🤖 AI 智能生成评论", variant="primary", size="lg",
)
pro_ai_status = gr.Markdown("")
with gr.Column(scale=2):
pro_comment_text = gr.TextArea(
label="✏️ 评论内容 (可手动修改)", lines=3,
interactive=True,
placeholder="点击左侧按钮自动生成,也可手动编写",
)
with gr.Row():
btn_pro_send = gr.Button("📩 发送评论", variant="primary")
pro_send_status = gr.Markdown("")
# ======== 子 Tab B: 回复我的评论 ========
with gr.Tab("💌 回复粉丝评论"):
gr.Markdown(
"> **流程**:选择我的笔记 → 加载评论 → "
"粘贴要回复的评论 → AI 生成回复 → 一键发送"
)
# 笔记选择器 (自动用已保存的 userId 获取)
with gr.Row():
btn_my_fetch = gr.Button("🔍 获取我的笔记", variant="primary")
with gr.Row():
my_selector = gr.Dropdown(
label="📋 选择我的笔记",
choices=[], interactive=True,
)
my_fetch_status = gr.Markdown("")
with gr.Row():
my_feed_id = gr.Textbox(label="笔记 ID", interactive=False)
my_xsec_token = gr.Textbox(label="xsec_token", interactive=False)
my_title = gr.Textbox(label="笔记标题", interactive=False)
btn_my_load_comments = gr.Button("📥 加载评论", variant="primary")
my_comment_status = gr.Markdown("")
my_comments_display = gr.TextArea(
label="📋 粉丝评论列表", lines=12, interactive=False,
)
gr.Markdown("---")
gr.Markdown("#### 📝 回复评论")
with gr.Row():
with gr.Column(scale=1):
my_target_comment = gr.TextArea(
label="要回复的评论内容", lines=3,
placeholder="从上方评论列表中复制粘贴要回复的评论",
)
btn_my_ai_reply = gr.Button(
"🤖 AI 生成回复", variant="secondary",
)
my_reply_gen_status = gr.Markdown("")
with gr.Column(scale=1):
my_reply_content = gr.TextArea(
label="回复内容 (可修改)", lines=3,
interactive=True,
)
btn_my_send_reply = gr.Button(
"📩 发送回复", variant="primary",
)
my_reply_status = gr.Markdown("")
# -------- Tab 4: 账号登录 --------
with gr.Tab("🔐 账号登录"):
gr.Markdown(
"### 小红书账号登录\n"
"> 扫码登录后自动获取 xsec_token配合用户 ID 即可使用所有功能"
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(
"**操作步骤:**\n"
"1. 确保 MCP 服务已启动\n"
"2. 点击「获取登录二维码」→ 用小红书 App 扫码\n"
"3. 点击「检查登录状态」→ 自动获取并保存 xsec_token\n"
"4. 首次使用请填写你的用户 ID 并点击保存\n\n"
"⚠️ 登录后不要在其他网页端登录同一账号,否则会被踢出"
)
btn_get_qrcode = gr.Button(
"📱 获取登录二维码", variant="primary", size="lg",
)
btn_check_login = gr.Button(
"🔍 检查登录状态 (自动获取 Token)",
variant="secondary", size="lg",
)
btn_logout = gr.Button(
"🚪 退出登录 (重新扫码)",
variant="stop", size="lg",
)
login_status = gr.Markdown("🔄 等待操作...")
gr.Markdown("---")
gr.Markdown(
"#### 📌 我的账号信息\n"
"> **注意**: 小红书号 ≠ 用户 ID\n"
"> - **小红书号 (redId)**: 如 `18688457507`,是你在 App 个人页看到的\n"
"> - **用户 ID (userId)**: 如 `5a695db6e8ac2b72e8af2a53`24位十六进制字符串\n\n"
"💡 **如何获取 userId?**\n"
"1. 用浏览器打开你的小红书主页\n"
"2. 网址格式为: `xiaohongshu.com/user/profile/xxxxxxxx`\n"
"3. `profile/` 后面的就是你的 userId"
)
login_user_id = gr.Textbox(
label="我的用户 ID (24位 userId, 非小红书号)",
value=config.get("my_user_id", ""),
placeholder="如: 5a695db6e8ac2b72e8af2a53",
)
login_xsec_token = gr.Textbox(
label="xsec_token (登录后自动获取)",
value=config.get("xsec_token", ""),
interactive=False,
)
btn_save_uid = gr.Button(
"💾 保存用户 ID", variant="secondary",
)
save_uid_status = gr.Markdown("")
with gr.Column(scale=1):
qr_image = gr.Image(
label="扫码登录", height=350, width=350,
)
# -------- Tab 5: 数据看板 --------
with gr.Tab("📊 数据看板"):
gr.Markdown(
"### 我的账号数据看板\n"
"> 用户 ID 和 xsec_token 从「账号登录」自动获取,直接点击加载即可"
)
with gr.Row():
with gr.Column(scale=1):
data_user_id = gr.Textbox(
label="我的用户 ID (自动填充)",
value=config.get("my_user_id", ""),
interactive=True,
)
data_xsec_token = gr.Textbox(
label="xsec_token (自动填充)",
value=config.get("xsec_token", ""),
interactive=True,
)
btn_refresh_token = gr.Button(
"🔄 刷新 Token", variant="secondary",
)
btn_load_my_data = gr.Button(
"📊 加载我的数据", variant="primary", size="lg",
)
data_status = gr.Markdown("")
with gr.Column(scale=2):
profile_card = gr.Markdown(
value="*等待加载...*",
label="账号概览",
)
gr.Markdown("---")
gr.Markdown("### 📈 数据可视化")
with gr.Row():
with gr.Column(scale=1):
chart_interact = gr.Plot(label="📊 核心指标")
with gr.Column(scale=2):
chart_notes = gr.Plot(label="❤ 笔记点赞排行")
gr.Markdown("---")
notes_detail = gr.Markdown(
value="*加载数据后显示笔记明细表格*",
label="笔记数据明细",
)
# -------- Tab 6: 自动运营 --------
with gr.Tab("🤖 自动运营"):
gr.Markdown(
"### 🤖 无人值守自动化运营\n"
"> 一键评论引流 + 一键点赞 + 一键收藏 + 一键回复 + 一键发布 + 随机定时全自动\n\n"
"⚠️ **注意**: 请确保已连接 LLM、SD WebUI 和 MCP 服务"
)
with gr.Row():
# 左栏: 一键操作
with gr.Column(scale=1):
gr.Markdown("#### 💬 一键智能评论")
gr.Markdown(
"> 自动搜索高赞笔记 → AI 分析内容 → 生成评论 → 发送\n"
"每次随机选关键词搜索,从结果中随机选笔记"
)
auto_comment_keywords = gr.Textbox(
label="评论关键词池 (逗号分隔)",
value=", ".join(DEFAULT_COMMENT_KEYWORDS),
placeholder="关键词1, 关键词2, ...",
)
btn_auto_comment = gr.Button(
"💬 一键评论 (单次)", variant="primary", size="lg",
)
auto_comment_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 👍 一键自动点赞")
gr.Markdown(
"> 搜索笔记 → 随机选择多篇 → 依次点赞\n"
"提升账号活跃度,无需 LLM"
)
auto_like_count = gr.Number(
label="单次点赞数量", value=5, minimum=1, maximum=20,
)
btn_auto_like = gr.Button(
"👍 一键点赞 (单次)", variant="primary", size="lg",
)
auto_like_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### ⭐ 一键自动收藏")
gr.Markdown(
"> 搜索笔记 → 随机选择多篇 → 依次收藏\n"
"提升账号活跃度,与点赞互补"
)
auto_fav_count = gr.Number(
label="单次收藏数量", value=3, minimum=1, maximum=15,
)
btn_auto_favorite = gr.Button(
"⭐ 一键收藏 (单次)", variant="primary", size="lg",
)
auto_favorite_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 💌 一键自动回复")
gr.Markdown(
"> 扫描我的所有笔记 → 找到粉丝评论 → AI 生成回复 → 逐条发送\n"
"自动跳过自己的评论,模拟真人间隔回复"
)
auto_reply_max = gr.Number(
label="单次最多回复条数", value=5, minimum=1, maximum=20,
)
btn_auto_reply = gr.Button(
"💌 一键回复 (单次)", variant="primary", size="lg",
)
auto_reply_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 🚀 一键智能发布")
gr.Markdown(
"> 随机选主题+风格 → AI 生成文案 → SD 生成图片 → 自动发布"
)
auto_publish_topics = gr.Textbox(
label="主题池 (逗号分隔)",
value=", ".join(random.sample(DEFAULT_TOPICS, min(15, len(DEFAULT_TOPICS)))),
placeholder="主题会从池中随机选取,可自行修改",
)
btn_auto_publish = gr.Button(
"🚀 一键发布 (单次)", variant="primary", size="lg",
)
auto_publish_result = gr.Markdown("")
# 右栏: 定时自动化
with gr.Column(scale=1):
gr.Markdown("#### ⏰ 随机定时自动化")
gr.Markdown(
"> 设置时间间隔后启动,系统将在随机时间自动执行\n"
"> 模拟真人操作节奏,降低被检测风险"
)
sched_status = gr.Markdown("⚪ **调度器未运行**")
# 运营时段设置
with gr.Group():
gr.Markdown("##### ⏰ 运营时段")
with gr.Row():
sched_start_hour = gr.Number(
label="开始时间(整点)", value=8, minimum=0, maximum=23,
)
sched_end_hour = gr.Number(
label="结束时间(整点)", value=23, minimum=1, maximum=24,
)
with gr.Group():
sched_comment_on = gr.Checkbox(
label="✅ 启用自动评论", value=True,
)
with gr.Row():
sched_c_min = gr.Number(
label="评论最小间隔(分钟)", value=15, minimum=5,
)
sched_c_max = gr.Number(
label="评论最大间隔(分钟)", value=45, minimum=10,
)
with gr.Group():
sched_like_on = gr.Checkbox(
label="✅ 启用自动点赞", value=True,
)
with gr.Row():
sched_l_min = gr.Number(
label="点赞最小间隔(分钟)", value=10, minimum=3,
)
sched_l_max = gr.Number(
label="点赞最大间隔(分钟)", value=30, minimum=5,
)
sched_like_count = gr.Number(
label="每轮点赞数量", value=5, minimum=1, maximum=15,
)
with gr.Group():
sched_fav_on = gr.Checkbox(
label="✅ 启用自动收藏", value=True,
)
with gr.Row():
sched_fav_min = gr.Number(
label="收藏最小间隔(分钟)", value=12, minimum=3,
)
sched_fav_max = gr.Number(
label="收藏最大间隔(分钟)", value=35, minimum=5,
)
sched_fav_count = gr.Number(
label="每轮收藏数量", value=3, minimum=1, maximum=10,
)
with gr.Group():
sched_reply_on = gr.Checkbox(
label="✅ 启用自动回复评论", value=True,
)
with gr.Row():
sched_r_min = gr.Number(
label="回复最小间隔(分钟)", value=20, minimum=5,
)
sched_r_max = gr.Number(
label="回复最大间隔(分钟)", value=60, minimum=10,
)
sched_reply_max = gr.Number(
label="每轮最多回复条数", value=3, minimum=1, maximum=10,
)
with gr.Group():
sched_publish_on = gr.Checkbox(
label="✅ 启用自动发布", value=True,
)
with gr.Row():
sched_p_min = gr.Number(
label="发布最小间隔(分钟)", value=60, minimum=30,
)
sched_p_max = gr.Number(
label="发布最大间隔(分钟)", value=180, minimum=60,
)
with gr.Row():
btn_start_sched = gr.Button(
"▶️ 启动定时", variant="primary", size="lg",
)
btn_stop_sched = gr.Button(
"⏹️ 停止定时", variant="stop", size="lg",
)
sched_result = gr.Markdown("")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("#### 📋 运行日志")
with gr.Row():
btn_refresh_log = gr.Button("🔄 刷新日志", size="sm")
btn_clear_log = gr.Button("🗑️ 清空日志", size="sm")
btn_refresh_stats = gr.Button("📊 刷新统计", size="sm")
auto_log_display = gr.TextArea(
label="自动化运行日志",
value="📋 暂无日志\n\n💡 执行操作后日志将在此显示",
lines=15,
interactive=False,
)
with gr.Column(scale=1):
gr.Markdown("#### 📊 今日运营统计")
auto_stats_display = gr.Markdown(
value=_get_stats_summary(),
)
# ==================================================
# 事件绑定
# ==================================================
# ---- 全局设置: LLM 提供商管理 ----
btn_connect_llm.click(
fn=connect_llm, inputs=[llm_provider],
outputs=[llm_model, status_bar],
)
llm_provider.change(
fn=on_provider_selected,
inputs=[llm_provider],
outputs=[llm_provider_info],
)
btn_add_provider.click(
fn=add_llm_provider,
inputs=[new_provider_name, new_provider_key, new_provider_url],
outputs=[llm_provider, provider_mgmt_status],
)
btn_del_provider.click(
fn=remove_llm_provider,
inputs=[llm_provider],
outputs=[llm_provider, provider_mgmt_status],
)
btn_connect_sd.click(
fn=connect_sd, inputs=[sd_url],
outputs=[sd_model, status_bar],
)
btn_check_mcp.click(
fn=check_mcp_status, inputs=[mcp_url],
outputs=[status_bar],
)
# ---- Tab 1: 内容创作 ----
btn_gen_copy.click(
fn=generate_copy,
inputs=[llm_model, topic, style],
outputs=[res_title, res_content, res_prompt, res_tags, status_bar],
)
btn_gen_img.click(
fn=generate_images,
inputs=[sd_url, res_prompt, neg_prompt, sd_model, steps, cfg_scale],
outputs=[gallery, state_images, status_bar],
)
btn_export.click(
fn=one_click_export,
inputs=[res_title, res_content, state_images],
outputs=[publish_msg],
)
btn_publish.click(
fn=publish_to_xhs,
inputs=[res_title, res_content, res_tags, state_images,
local_images, mcp_url, schedule_time],
outputs=[publish_msg],
)
# ---- Tab 2: 热点探测 ----
btn_search.click(
fn=search_hotspots,
inputs=[hot_keyword, hot_sort, mcp_url],
outputs=[search_status, search_output],
)
# 搜索结果同步到 state
search_output.change(
fn=lambda x: x, inputs=[search_output], outputs=[state_search_result],
)
btn_analyze.click(
fn=analyze_and_suggest,
inputs=[llm_model, hot_keyword, search_output],
outputs=[analysis_status, analysis_output, topic_from_hot],
)
btn_gen_from_hot.click(
fn=generate_from_hotspot,
inputs=[llm_model, topic_from_hot, hot_style, search_output],
outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status],
)
# 同步热点文案到内容创作 Tab
btn_sync_to_create.click(
fn=lambda t, c, p, tg: (t, c, p, tg, "✅ 已同步到「内容创作」,可切换 Tab 继续绘图和发布"),
inputs=[hot_title, hot_content, hot_prompt, hot_tags],
outputs=[res_title, res_content, res_prompt, res_tags, status_bar],
)
# ---- Tab 3: 评论管家 ----
# == 子 Tab A: 主动评论引流 ==
btn_pro_fetch.click(
fn=fetch_proactive_notes,
inputs=[pro_keyword, mcp_url],
outputs=[pro_selector, pro_fetch_status],
)
pro_selector.change(
fn=on_proactive_note_selected,
inputs=[pro_selector],
outputs=[pro_feed_id, pro_xsec_token, pro_title],
)
btn_pro_load.click(
fn=load_note_for_comment,
inputs=[pro_feed_id, pro_xsec_token, mcp_url],
outputs=[pro_load_status, pro_content, pro_comments, pro_full_text],
)
btn_pro_ai.click(
fn=ai_generate_comment,
inputs=[llm_model, persona,
pro_title, pro_content, pro_comments],
outputs=[pro_comment_text, pro_ai_status],
)
btn_pro_send.click(
fn=send_comment,
inputs=[pro_feed_id, pro_xsec_token, pro_comment_text, mcp_url],
outputs=[pro_send_status],
)
# == 子 Tab B: 回复粉丝评论 ==
btn_my_fetch.click(
fn=fetch_my_notes,
inputs=[mcp_url],
outputs=[my_selector, my_fetch_status],
)
my_selector.change(
fn=on_my_note_selected,
inputs=[my_selector],
outputs=[my_feed_id, my_xsec_token, my_title],
)
btn_my_load_comments.click(
fn=fetch_my_note_comments,
inputs=[my_feed_id, my_xsec_token, mcp_url],
outputs=[my_comment_status, my_comments_display],
)
btn_my_ai_reply.click(
fn=ai_reply_comment,
inputs=[llm_model, persona,
my_title, my_target_comment],
outputs=[my_reply_content, my_reply_gen_status],
)
btn_my_send_reply.click(
fn=send_reply,
inputs=[my_feed_id, my_xsec_token, my_reply_content, mcp_url],
outputs=[my_reply_status],
)
# ---- Tab 4: 账号登录 ----
btn_get_qrcode.click(
fn=get_login_qrcode,
inputs=[mcp_url],
outputs=[qr_image, login_status],
)
btn_check_login.click(
fn=check_login,
inputs=[mcp_url],
outputs=[login_status, login_user_id, login_xsec_token],
)
btn_logout.click(
fn=logout_xhs,
inputs=[mcp_url],
outputs=[login_status],
)
btn_save_uid.click(
fn=save_my_user_id,
inputs=[login_user_id],
outputs=[save_uid_status],
)
# ---- Tab 5: 数据看板 ----
def refresh_xsec_token(mcp_url):
token = _auto_fetch_xsec_token(mcp_url)
if token:
cfg.set("xsec_token", token)
return gr.update(value=token), "✅ Token 已刷新"
return gr.update(value=cfg.get("xsec_token", "")), "❌ 刷新失败,请确认已登录"
btn_refresh_token.click(
fn=refresh_xsec_token,
inputs=[mcp_url],
outputs=[data_xsec_token, data_status],
)
btn_load_my_data.click(
fn=fetch_my_profile,
inputs=[data_user_id, data_xsec_token, mcp_url],
outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail],
)
# ---- Tab 6: 自动运营 ----
btn_auto_comment.click(
fn=_auto_comment_with_log,
inputs=[auto_comment_keywords, mcp_url, llm_model, persona],
outputs=[auto_comment_result, auto_log_display],
)
btn_auto_like.click(
fn=_auto_like_with_log,
inputs=[auto_comment_keywords, auto_like_count, mcp_url],
outputs=[auto_like_result, auto_log_display],
)
btn_auto_favorite.click(
fn=_auto_favorite_with_log,
inputs=[auto_comment_keywords, auto_fav_count, mcp_url],
outputs=[auto_favorite_result, auto_log_display],
)
btn_auto_reply.click(
fn=_auto_reply_with_log,
inputs=[auto_reply_max, mcp_url, llm_model, persona],
outputs=[auto_reply_result, auto_log_display],
)
btn_auto_publish.click(
fn=_auto_publish_with_log,
inputs=[auto_publish_topics, mcp_url, sd_url, sd_model, llm_model],
outputs=[auto_publish_result, auto_log_display],
)
btn_start_sched.click(
fn=start_scheduler,
inputs=[sched_comment_on, sched_publish_on, sched_reply_on, sched_like_on,
sched_fav_on,
sched_c_min, sched_c_max, sched_p_min, sched_p_max,
sched_r_min, sched_r_max, sched_reply_max,
sched_l_min, sched_l_max, sched_like_count,
sched_fav_min, sched_fav_max, sched_fav_count,
sched_start_hour, sched_end_hour,
auto_comment_keywords, auto_publish_topics,
mcp_url, sd_url, sd_model, llm_model, persona],
outputs=[sched_result],
)
btn_stop_sched.click(
fn=stop_scheduler,
inputs=[],
outputs=[sched_result],
)
btn_refresh_log.click(
fn=lambda: (get_auto_log(), get_scheduler_status()),
inputs=[],
outputs=[auto_log_display, sched_status],
)
btn_clear_log.click(
fn=lambda: (_auto_log.clear() or "📋 日志已清空"),
inputs=[],
outputs=[auto_log_display],
)
btn_refresh_stats.click(
fn=lambda: (get_scheduler_status(), _get_stats_summary()),
inputs=[],
outputs=[sched_status, auto_stats_display],
)
# ---- 开机自启 ----
autostart_toggle.change(
fn=toggle_autostart,
inputs=[autostart_toggle],
outputs=[autostart_status],
)
# ---- 启动时自动刷新 SD ----
app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar])
# ==================================================
if __name__ == "__main__":
logger.info("🍒 小红书 AI 爆文工坊 V2.0 启动中...")
app.launch(
server_name=os.environ.get("GRADIO_SERVER_NAME", "127.0.0.1"),
server_port=int(os.environ.get("GRADIO_SERVER_PORT", "7860")),
inbrowser=True,
share=False,
)