""" 小红书 AI 爆文生产工坊 V2.0 全自动工作台:灵感 -> 文案 -> 绘图 -> 发布 -> 运营 """ import gradio as gr import os import re import json import time import logging import platform import subprocess import threading import random from datetime import datetime from PIL import Image import matplotlib import matplotlib.pyplot as plt from config_manager import ConfigManager, OUTPUT_DIR from llm_service import LLMService from sd_service import SDService, DEFAULT_NEGATIVE, FACE_IMAGE_PATH, SD_PRESET_NAMES, get_sd_preset, get_model_profile, get_model_profile_info, detect_model_profile, SD_MODEL_PROFILES from mcp_client import MCPClient, get_mcp_client from analytics_service import AnalyticsService # ================= matplotlib 中文字体配置 ================= _font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"] for _fn in _font_candidates: try: matplotlib.font_manager.findfont(_fn, fallback_to_default=False) plt.rcParams["font.sans-serif"] = [_fn] break except Exception: continue plt.rcParams["axes.unicode_minus"] = False # ================= 日志配置 ================= logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("autobot.log", encoding="utf-8"), ], ) logger = logging.getLogger("autobot") # 强制不走代理连接本地服务 os.environ["NO_PROXY"] = "127.0.0.1,localhost" # ================= 全局服务初始化 ================= cfg = ConfigManager() cfg.ensure_workspace() mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp")) analytics = AnalyticsService(OUTPUT_DIR) # ================================================== # LLM 多提供商管理 # ================================================== def _get_llm_config() -> tuple[str, str, str]: """获取当前激活 LLM 的 (api_key, base_url, model)""" p = cfg.get_active_llm() if p: return p["api_key"], p["base_url"], cfg.get("model", "") return "", "", "" def connect_llm(provider_name): """连接选中的 LLM 提供商并获取模型列表""" if not provider_name: return gr.update(choices=[], value=None), "⚠️ 请先选择或添加 LLM 提供商" cfg.set_active_llm(provider_name) p = cfg.get_active_llm() if not p: return gr.update(choices=[], value=None), "❌ 未找到该提供商配置" try: svc = LLMService(p["api_key"], p["base_url"]) models = svc.get_models() if models: return ( gr.update(choices=models, value=models[0]), f"✅ 已连接「{provider_name}」,加载 {len(models)} 个模型", ) else: # API 无法获取模型列表,保留手动输入 current_model = cfg.get("model", "") return ( gr.update(choices=[current_model] if current_model else [], value=current_model or None), f"⚠️ 已连接「{provider_name}」,但未获取到模型列表,请手动输入模型名", ) except Exception as e: logger.error("LLM 连接失败: %s", e) current_model = cfg.get("model", "") return ( gr.update(choices=[current_model] if current_model else [], value=current_model or None), f"❌ 连接「{provider_name}」失败: {e}", ) def add_llm_provider(name, api_key, base_url): """添加新的 LLM 提供商""" msg = cfg.add_llm_provider(name, api_key, base_url) names = cfg.get_llm_provider_names() active = cfg.get("active_llm", "") return ( gr.update(choices=names, value=active), msg, ) def remove_llm_provider(provider_name): """删除 LLM 提供商""" if not provider_name: return gr.update(choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", "")), "⚠️ 请先选择要删除的提供商" msg = cfg.remove_llm_provider(provider_name) names = cfg.get_llm_provider_names() active = cfg.get("active_llm", "") return ( gr.update(choices=names, value=active), msg, ) def on_provider_selected(provider_name): """切换 LLM 提供商时更新显示信息""" if not provider_name: return "未选择提供商" for p in cfg.get_llm_providers(): if p["name"] == provider_name: cfg.set_active_llm(provider_name) masked_key = p["api_key"][:8] + "***" if len(p["api_key"]) > 8 else "***" return f"**{provider_name}** \nAPI Key: `{masked_key}` \nBase URL: `{p['base_url']}`" return "未找到该提供商" # ================================================== # Tab 1: 内容创作 # ================================================== def connect_sd(sd_url): """连接 SD 并获取模型列表""" try: svc = SDService(sd_url) ok, msg = svc.check_connection() if ok: models = svc.get_models() cfg.set("sd_url", sd_url) first = models[0] if models else None info = get_model_profile_info(first) if first else "未检测到模型" return gr.update(choices=models, value=first), f"✅ {msg}", info return gr.update(choices=[]), f"❌ {msg}", "" except Exception as e: logger.error("SD 连接失败: %s", e) return gr.update(choices=[]), f"❌ SD 连接失败: {e}", "" def on_sd_model_change(model_name): """SD 模型切换时显示模型档案信息""" if not model_name: return "未选择模型" return get_model_profile_info(model_name) def check_mcp_status(mcp_url): """检查 MCP 连接状态""" try: client = get_mcp_client(mcp_url) ok, msg = client.check_connection() if ok: cfg.set("mcp_url", mcp_url) return f"✅ MCP 服务正常 - {msg}" return f"❌ {msg}" except Exception as e: return f"❌ MCP 连接失败: {e}" # ================================================== # 小红书账号登录 # ================================================== def get_login_qrcode(mcp_url): """获取小红书登录二维码""" try: client = get_mcp_client(mcp_url) result = client.get_login_qrcode() if "error" in result: return None, f"❌ 获取二维码失败: {result['error']}" qr_image = result.get("qr_image") msg = result.get("text", "") if qr_image: return qr_image, f"✅ 二维码已生成,请用小红书 App 扫码\n{msg}" return None, f"⚠️ 未获取到二维码图片,MCP 返回:\n{msg}" except Exception as e: logger.error("获取登录二维码失败: %s", e) return None, f"❌ 获取二维码失败: {e}" def logout_xhs(mcp_url): """退出登录:清除 cookies 并重置本地 token""" try: client = get_mcp_client(mcp_url) result = client.delete_cookies() if "error" in result: return f"❌ 退出失败: {result['error']}" cfg.set("xsec_token", "") client._reset() return "✅ 已退出登录,可以重新扫码登录" except Exception as e: logger.error("退出登录失败: %s", e) return f"❌ 退出失败: {e}" def _auto_fetch_xsec_token(mcp_url) -> str: """从推荐列表自动获取一个有效的 xsec_token""" try: client = get_mcp_client(mcp_url) entries = client.list_feeds_parsed() for e in entries: token = e.get("xsec_token", "") if token: return token except Exception as e: logger.warning("自动获取 xsec_token 失败: %s", e) return "" def check_login(mcp_url): """检查登录状态,登录成功后自动获取 xsec_token 并保存""" try: client = get_mcp_client(mcp_url) result = client.check_login_status() if "error" in result: return f"❌ {result['error']}", gr.update(), gr.update() text = result.get("text", "") if "未登录" in text: return f"🔴 {text}", gr.update(), gr.update() # 登录成功 → 自动获取 xsec_token token = _auto_fetch_xsec_token(mcp_url) if token: cfg.set("xsec_token", token) logger.info("自动获取 xsec_token 成功") return ( f"🟢 {text}\n\n✅ xsec_token 已自动获取并保存", gr.update(value=cfg.get("my_user_id", "")), gr.update(value=token), ) return f"🟢 {text}\n\n⚠️ 自动获取 xsec_token 失败,请手动刷新", gr.update(), gr.update() except Exception as e: return f"❌ 检查登录状态失败: {e}", gr.update(), gr.update() def save_my_user_id(user_id_input): """保存用户 ID (验证 24 位十六进制格式)""" uid = (user_id_input or "").strip() if not uid: cfg.set("my_user_id", "") return "⚠️ 已清除用户 ID" if not re.match(r'^[0-9a-fA-F]{24}$', uid): return ( "❌ 格式错误!用户 ID 应为 24 位十六进制字符串\n" f"你输入的: `{uid}` ({len(uid)} 位)\n\n" "💡 如果你输入的是小红书号 (纯数字如 18688457507),那不是 userId。" ) cfg.set("my_user_id", uid) return f"✅ 用户 ID 已保存: `{uid}`" # ================= 头像/换脸管理 ================= def upload_face_image(img): """上传并保存头像图片""" if img is None: return None, "❌ 请上传头像图片" try: if isinstance(img, str) and os.path.isfile(img): img = Image.open(img).convert("RGB") elif not isinstance(img, Image.Image): return None, "❌ 无法识别图片格式" path = SDService.save_face_image(img) return img, f"✅ 头像已保存至 {os.path.basename(path)}" except Exception as e: return None, f"❌ 保存失败: {e}" def load_saved_face_image(): """加载已保存的头像""" img = SDService.load_face_image() if img: return img, "✅ 已加载保存的头像" return None, "ℹ️ 尚未设置头像" def generate_copy(model, topic, style, sd_model_name, persona_text): """生成文案(自动适配 SD 模型的 prompt 风格,支持人设)""" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" try: svc = LLMService(api_key, base_url, model) persona = _resolve_persona(persona_text) if persona_text else None data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) cfg.set("model", model) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags) if tags else "", "✅ 文案生成完毕", ) except Exception as e: logger.error("文案生成失败: %s", e) return "", "", "", "", f"❌ 生成失败: {e}" def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale, face_swap_on, face_img, quality_mode): """生成图片(可选 ReActor 换脸,支持质量模式预设)""" if not model: return None, [], "❌ 未选择 SD 模型" try: svc = SDService(sd_url) # 判断是否启用换脸 face_image = None if face_swap_on: # Gradio 可能传 PIL.Image / numpy.ndarray / 文件路径 / None if face_img is not None: if isinstance(face_img, Image.Image): face_image = face_img elif isinstance(face_img, str) and os.path.isfile(face_img): face_image = Image.open(face_img).convert("RGB") else: # numpy array 等其他格式 try: import numpy as np if isinstance(face_img, np.ndarray): face_image = Image.fromarray(face_img).convert("RGB") logger.info("头像从 numpy array 转换为 PIL Image") except Exception as e: logger.warning("头像格式转换失败 (%s): %s", type(face_img).__name__, e) # 如果 UI 没传有效头像,从本地文件加载 if face_image is None: face_image = SDService.load_face_image() if face_image is not None: logger.info("换脸头像已就绪: %dx%d", face_image.width, face_image.height) else: logger.warning("换脸已启用但未找到有效头像") images = svc.txt2img( prompt=prompt, negative_prompt=neg_prompt, model=model, steps=int(steps), cfg_scale=float(cfg_scale), face_image=face_image, quality_mode=quality_mode, ) preset = get_sd_preset(quality_mode) swap_hint = " (已换脸)" if face_image else "" return images, images, f"✅ 生成 {len(images)} 张图片{swap_hint} [{quality_mode}]" except Exception as e: logger.error("图片生成失败: %s", e) return None, [], f"❌ 绘图失败: {e}" def one_click_export(title, content, images): """导出文案和图片到本地""" if not title: return "❌ 无法导出:没有标题" safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] folder_name = f"{int(time.time())}_{safe_title}" folder_path = os.path.join(OUTPUT_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"{title}\n\n{content}") saved_paths = [] if images: for idx, img in enumerate(images): path = os.path.join(folder_path, f"图{idx+1}.jpg") if isinstance(img, Image.Image): if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) saved_paths.append(os.path.abspath(path)) # 尝试打开文件夹 try: abs_path = os.path.abspath(folder_path) if platform.system() == "Windows": os.startfile(abs_path) elif platform.system() == "Darwin": subprocess.call(["open", abs_path]) else: subprocess.call(["xdg-open", abs_path]) except Exception: pass return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)" def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time): """通过 MCP 发布到小红书""" if not title: return "❌ 缺少标题" client = get_mcp_client(mcp_url) # 收集图片路径 image_paths = [] # 先保存 AI 生成的图片到临时目录 if images: temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish") os.makedirs(temp_dir, exist_ok=True) for idx, img in enumerate(images): if isinstance(img, Image.Image): path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.jpg")) if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) image_paths.append(path) # 添加本地上传的图片 if local_images: for img_file in local_images: # Gradio File 组件返回的是 NamedString 或 tempfile path img_path = img_file.name if hasattr(img_file, 'name') else str(img_file) if os.path.exists(img_path): image_paths.append(os.path.abspath(img_path)) if not image_paths: return "❌ 至少需要 1 张图片才能发布" # 解析标签 tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None # 定时发布 schedule = schedule_time if schedule_time and schedule_time.strip() else None try: result = client.publish_content( title=title, content=content, images=image_paths, tags=tags, schedule_at=schedule, ) if "error" in result: return f"❌ 发布失败: {result['error']}" return f"✅ 发布成功!\n{result.get('text', '')}" except Exception as e: logger.error("发布失败: %s", e) return f"❌ 发布异常: {e}" # ================================================== # Tab 2: 热点探测 # ================================================== def search_hotspots(keyword, sort_by, mcp_url): """搜索小红书热门内容""" if not keyword: return "❌ 请输入搜索关键词", "" try: client = get_mcp_client(mcp_url) result = client.search_feeds(keyword, sort_by=sort_by) if "error" in result: return f"❌ 搜索失败: {result['error']}", "" text = result.get("text", "无结果") return "✅ 搜索完成", text except Exception as e: logger.error("热点搜索失败: %s", e) return f"❌ 搜索失败: {e}", "" def analyze_and_suggest(model, keyword, search_result): """AI 分析热点并给出建议""" if not search_result: return "❌ 请先搜索", "", "" api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ 请先配置 LLM 提供商", "", "" try: svc = LLMService(api_key, base_url, model) analysis = svc.analyze_hotspots(search_result) topics = "\n".join(f"• {t}" for t in analysis.get("hot_topics", [])) patterns = "\n".join(f"• {p}" for p in analysis.get("title_patterns", [])) suggestions = "\n".join( f"**{s['topic']}** - {s['reason']}" for s in analysis.get("suggestions", []) ) structure = analysis.get("content_structure", "") summary = ( f"## 🔥 热门选题\n{topics}\n\n" f"## 📝 标题套路\n{patterns}\n\n" f"## 📐 内容结构\n{structure}\n\n" f"## 💡 推荐选题\n{suggestions}" ) return "✅ 分析完成", summary, keyword except Exception as e: logger.error("热点分析失败: %s", e) return f"❌ 分析失败: {e}", "", "" def generate_from_hotspot(model, topic_from_hotspot, style, search_result, sd_model_name, persona_text): """基于热点分析生成文案(自动适配 SD 模型,支持人设)""" if not topic_from_hotspot: return "", "", "", "", "❌ 请先选择或输入选题" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置 LLM 提供商" try: svc = LLMService(api_key, base_url, model) persona = _resolve_persona(persona_text) if persona_text else None data = svc.generate_copy_with_reference( topic=topic_from_hotspot, style=style, reference_notes=search_result[:2000], sd_model_name=sd_model_name, persona=persona, ) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags), "✅ 基于热点的文案已生成", ) except Exception as e: return "", "", "", "", f"❌ 生成失败: {e}" # ================================================== # Tab 3: 评论管家 # ================================================== # ---- 共用: 笔记列表缓存 ---- # 主动评论缓存 _cached_proactive_entries: list[dict] = [] # 我的笔记评论缓存 _cached_my_note_entries: list[dict] = [] def _fetch_and_cache(keyword, mcp_url, cache_name="proactive"): """通用: 获取笔记列表并缓存""" global _cached_proactive_entries, _cached_my_note_entries try: client = get_mcp_client(mcp_url) if keyword and keyword.strip(): entries = client.search_feeds_parsed(keyword.strip()) src = f"搜索「{keyword.strip()}」" else: entries = client.list_feeds_parsed() src = "首页推荐" if cache_name == "proactive": _cached_proactive_entries = entries else: _cached_my_note_entries = entries if not entries: return gr.update(choices=[], value=None), f"⚠️ 从{src}未找到笔记" choices = [] for i, e in enumerate(entries): title_short = (e["title"] or "无标题")[:28] label = f"[{i+1}] {title_short} | @{e['author'] or '未知'} | ❤ {e['likes']}" choices.append(label) return ( gr.update(choices=choices, value=choices[0]), f"✅ 从{src}获取 {len(entries)} 条笔记", ) except Exception as e: if cache_name == "proactive": _cached_proactive_entries = [] else: _cached_my_note_entries = [] return gr.update(choices=[], value=None), f"❌ {e}" def _pick_from_cache(selected, cache_name="proactive"): """通用: 从缓存中提取选中条目的 feed_id / xsec_token / title""" cache = _cached_proactive_entries if cache_name == "proactive" else _cached_my_note_entries if not selected or not cache: return "", "", "" try: # 尝试从 [N] 前缀提取序号 idx = int(selected.split("]")[0].replace("[", "")) - 1 if 0 <= idx < len(cache): e = cache[idx] return e["feed_id"], e["xsec_token"], e.get("title", "") except (ValueError, IndexError): pass # 回退: 模糊匹配标题 for e in cache: if e.get("title", "")[:15] in selected: return e["feed_id"], e["xsec_token"], e.get("title", "") return "", "", "" # ---- 模块 A: 主动评论他人 ---- def fetch_proactive_notes(keyword, mcp_url): return _fetch_and_cache(keyword, mcp_url, "proactive") def on_proactive_note_selected(selected): return _pick_from_cache(selected, "proactive") def load_note_for_comment(feed_id, xsec_token, mcp_url): """加载目标笔记详情 (标题+正文+已有评论), 用于 AI 分析""" if not feed_id or not xsec_token: return "❌ 请先选择笔记", "", "", "" try: client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "", "", "" full_text = result.get("text", "") # 尝试分离正文和评论 if "评论" in full_text: parts = full_text.split("评论", 1) content_part = parts[0].strip() comments_part = "评论" + parts[1] if len(parts) > 1 else "" else: content_part = full_text[:500] comments_part = "" return "✅ 笔记内容已加载", content_part[:800], comments_part[:1500], full_text except Exception as e: return f"❌ {e}", "", "", "" def ai_generate_comment(model, persona, post_title, post_content, existing_comments): """AI 生成主动评论""" persona = _resolve_persona(persona) api_key, base_url, _ = _get_llm_config() if not api_key: return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM", "❌ 未选模型" if not post_title and not post_content: return "⚠️ 请先加载笔记内容", "❌ 无笔记内容" try: svc = LLMService(api_key, base_url, model) comment = svc.generate_proactive_comment( persona, post_title, post_content[:600], existing_comments[:800] ) return comment, "✅ 评论已生成" except Exception as e: logger.error(f"AI 评论生成失败: {e}") return f"生成失败: {e}", f"❌ {e}" def send_comment(feed_id, xsec_token, comment_content, mcp_url): """发送评论到别人的笔记""" if not all([feed_id, xsec_token, comment_content]): return "❌ 缺少必要参数 (笔记ID / token / 评论内容)" try: client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, comment_content) if "error" in result: return f"❌ {result['error']}" return "✅ 评论已发送!" except Exception as e: return f"❌ {e}" # ---- 模块 B: 回复我的笔记评论 ---- def fetch_my_notes(mcp_url): """通过已保存的 userId 获取我的笔记列表""" global _cached_my_note_entries my_uid = cfg.get("my_user_id", "") xsec = cfg.get("xsec_token", "") if not my_uid: return ( gr.update(choices=[], value=None), "❌ 未配置用户 ID,请先到「账号登录」页填写并保存", ) if not xsec: return ( gr.update(choices=[], value=None), "❌ 未获取 xsec_token,请先登录", ) try: client = get_mcp_client(mcp_url) result = client.get_user_profile(my_uid, xsec) if "error" in result: return gr.update(choices=[], value=None), f"❌ {result['error']}" # 从 raw 中解析 feeds raw = result.get("raw", {}) text = result.get("text", "") data = None if raw and isinstance(raw, dict): for item in raw.get("content", []): if item.get("type") == "text": try: data = json.loads(item["text"]) except (json.JSONDecodeError, KeyError): pass if not data: try: data = json.loads(text) except (json.JSONDecodeError, TypeError): pass feeds = (data or {}).get("feeds") or [] if not feeds: return ( gr.update(choices=[], value=None), "⚠️ 未找到你的笔记,可能账号还没有发布内容", ) entries = [] for f in feeds: nc = f.get("noteCard") or {} user = nc.get("user") or {} interact = nc.get("interactInfo") or {} entries.append({ "feed_id": f.get("id", ""), "xsec_token": f.get("xsecToken", ""), "title": nc.get("displayTitle", "未知标题"), "author": user.get("nickname", user.get("nickName", "")), "user_id": user.get("userId", ""), "likes": interact.get("likedCount", "0"), "type": nc.get("type", ""), }) _cached_my_note_entries = entries choices = [ f"[{i+1}] {e['title'][:20]} | {e['type']} | ❤{e['likes']}" for i, e in enumerate(entries) ] return ( gr.update(choices=choices, value=choices[0] if choices else None), f"✅ 找到 {len(entries)} 篇笔记", ) except Exception as e: return gr.update(choices=[], value=None), f"❌ {e}" def on_my_note_selected(selected): return _pick_from_cache(selected, "my_notes") def fetch_my_note_comments(feed_id, xsec_token, mcp_url): """获取我的笔记的评论列表""" if not feed_id or not xsec_token: return "❌ 请先选择笔记", "" try: client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "" return "✅ 评论加载完成", result.get("text", "暂无评论") except Exception as e: return f"❌ {e}", "" def ai_reply_comment(model, persona, post_title, comment_text): """AI 生成评论回复""" persona = _resolve_persona(persona) api_key, base_url, _ = _get_llm_config() if not api_key: return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM 并选择模型", "❌ 未选择模型" if not comment_text: return "请输入需要回复的评论内容", "⚠️ 请输入评论" try: svc = LLMService(api_key, base_url, model) reply = svc.generate_reply(persona, post_title, comment_text) return reply, "✅ 回复已生成" except Exception as e: logger.error(f"AI 回复生成失败: {e}") return f"生成失败: {e}", f"❌ {e}" def send_reply(feed_id, xsec_token, reply_content, mcp_url): """发送评论回复""" if not all([feed_id, xsec_token, reply_content]): return "❌ 缺少必要参数" try: client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, reply_content) if "error" in result: return f"❌ 回复失败: {result['error']}" return "✅ 回复已发送" except Exception as e: return f"❌ 发送失败: {e}" # ================================================== # Tab 4: 数据看板 (我的账号) # ================================================== def _parse_profile_json(text: str): """尝试从文本中解析用户 profile JSON""" if not text: return None # 直接 JSON try: return json.loads(text) except (json.JSONDecodeError, TypeError): pass # 可能包含 Markdown 代码块 m = re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', text) if m: try: return json.loads(m.group(1)) except (json.JSONDecodeError, TypeError): pass return None def _parse_count(val) -> float: """解析数字字符串, 支持 '1.2万' 格式""" if isinstance(val, (int, float)): return float(val) s = str(val).strip() if "万" in s: try: return float(s.replace("万", "")) * 10000 except ValueError: pass try: return float(s) except ValueError: return 0.0 def fetch_my_profile(user_id, xsec_token, mcp_url): """获取我的账号数据, 返回结构化信息 + 可视化图表""" if not user_id or not xsec_token: return "❌ 请填写你的用户 ID 和 xsec_token", "", None, None, None try: client = get_mcp_client(mcp_url) result = client.get_user_profile(user_id, xsec_token) if "error" in result: return f"❌ {result['error']}", "", None, None, None raw = result.get("raw", {}) text = result.get("text", "") # 尝试从 raw 或 text 解析 JSON data = None if raw and isinstance(raw, dict): content_list = raw.get("content", []) for item in content_list: if item.get("type") == "text": data = _parse_profile_json(item.get("text", "")) if data: break if not data: data = _parse_profile_json(text) if not data: return "✅ 数据加载完成 (纯文本)", text, None, None, None # ---- 提取基本信息 (注意 MCP 对新号可能返回 null) ---- basic = data.get("userBasicInfo") or {} interactions = data.get("interactions") or [] feeds = data.get("feeds") or [] gender_map = {0: "未知", 1: "男", 2: "女"} info_lines = [ f"## 👤 {basic.get('nickname', '未知')}", f"- **小红书号**: {basic.get('redId', '-')}", f"- **性别**: {gender_map.get(basic.get('gender', 0), '未知')}", f"- **IP 属地**: {basic.get('ipLocation', '-')}", f"- **简介**: {basic.get('desc', '-')}", "", "### 📊 核心数据", ] for inter in interactions: info_lines.append(f"- **{inter.get('name', '')}**: {inter.get('count', '0')}") info_lines.append(f"\n### 📝 展示笔记: {len(feeds)} 篇") profile_md = "\n".join(info_lines) # ---- 互动数据柱状图 ---- fig_interact = None if interactions: inter_data = {i["name"]: _parse_count(i["count"]) for i in interactions} fig_interact, ax = plt.subplots(figsize=(4, 3), dpi=100) labels = list(inter_data.keys()) values = list(inter_data.values()) colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(labels)] ax.bar(labels, values, color=colors, edgecolor="white", linewidth=0.5) ax.set_title("账号核心指标", fontsize=12, fontweight="bold") for i, v in enumerate(values): display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) ax.text(i, v + max(values) * 0.02, display, ha="center", fontsize=9) ax.set_ylabel("") ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) fig_interact.tight_layout() # ---- 笔记点赞分布图 ---- fig_notes = None if feeds: titles, likes = [], [] for f in feeds[:15]: nc = f.get("noteCard") or {} t = (nc.get("displayTitle", "") or "无标题")[:12] lk = _parse_count((nc.get("interactInfo") or {}).get("likedCount", "0")) titles.append(t) likes.append(lk) fig_notes, ax2 = plt.subplots(figsize=(7, 3.5), dpi=100) ax2.barh(range(len(titles)), likes, color="#FF6B6B", edgecolor="white") ax2.set_yticks(range(len(titles))) ax2.set_yticklabels(titles, fontsize=8) ax2.set_title(f"笔记点赞排行 (Top {len(titles)})", fontsize=12, fontweight="bold") ax2.invert_yaxis() for i, v in enumerate(likes): display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) ax2.text(v + max(likes) * 0.01 if max(likes) > 0 else 0, i, display, va="center", fontsize=8) ax2.spines["top"].set_visible(False) ax2.spines["right"].set_visible(False) fig_notes.tight_layout() # ---- 笔记详情表格 (Markdown) ---- table_lines = [ "### 📋 笔记数据明细", "| # | 标题 | 类型 | ❤ 点赞 |", "|---|------|------|--------|", ] for i, f in enumerate(feeds): nc = f.get("noteCard") or {} t = (nc.get("displayTitle", "") or "无标题")[:25] tp = "📹 视频" if nc.get("type") == "video" else "📷 图文" lk = (nc.get("interactInfo") or {}).get("likedCount", "0") table_lines.append(f"| {i+1} | {t} | {tp} | {lk} |") notes_table = "\n".join(table_lines) return "✅ 数据加载完成", profile_md, fig_interact, fig_notes, notes_table except Exception as e: logger.error(f"获取我的数据失败: {e}") return f"❌ {e}", "", None, None, None # ================================================== # 自动化运营模块 # ================================================== # 自动化状态 _auto_running = threading.Event() _auto_thread: threading.Thread | None = None _auto_log: list[str] = [] # ---- 操作记录:防重复 & 每日统计 ---- _op_history = { "commented_feeds": set(), # 已评论的 feed_id "replied_comments": set(), # 已回复的 comment_id "liked_feeds": set(), # 已点赞的 feed_id "favorited_feeds": set(), # 已收藏的 feed_id } _daily_stats = { "date": "", "comments": 0, "likes": 0, "favorites": 0, "publishes": 0, "replies": 0, "errors": 0, } # 每日操作上限 DAILY_LIMITS = { "comments": 30, "likes": 80, "favorites": 50, "publishes": 8, "replies": 40, } # 连续错误计数 → 冷却 _consecutive_errors = 0 _error_cooldown_until = 0.0 def _reset_daily_stats_if_needed(): """每天自动重置统计""" today = datetime.now().strftime("%Y-%m-%d") if _daily_stats["date"] != today: _daily_stats.update({ "date": today, "comments": 0, "likes": 0, "favorites": 0, "publishes": 0, "replies": 0, "errors": 0, }) # 每日重置历史记录(允许隔天重复互动) for k in _op_history: _op_history[k].clear() def _check_daily_limit(op_type: str) -> bool: """检查是否超出每日限额""" _reset_daily_stats_if_needed() limit = DAILY_LIMITS.get(op_type, 999) current = _daily_stats.get(op_type, 0) return current < limit def _increment_stat(op_type: str): """增加操作计数""" _reset_daily_stats_if_needed() _daily_stats[op_type] = _daily_stats.get(op_type, 0) + 1 def _record_error(): """记录错误,连续错误触发冷却""" global _consecutive_errors, _error_cooldown_until _consecutive_errors += 1 _daily_stats["errors"] = _daily_stats.get("errors", 0) + 1 if _consecutive_errors >= 3: cooldown = min(60 * _consecutive_errors, 600) # 最多冷却10分钟 _error_cooldown_until = time.time() + cooldown _auto_log_append(f"⚠️ 连续 {_consecutive_errors} 次错误,冷却 {cooldown}s") def _clear_error_streak(): """操作成功后清除连续错误记录""" global _consecutive_errors _consecutive_errors = 0 def _is_in_cooldown() -> bool: """检查是否在错误冷却期""" return time.time() < _error_cooldown_until def _is_in_operating_hours(start_hour: int = 7, end_hour: int = 23) -> bool: """检查是否在运营时间段""" now_hour = datetime.now().hour return start_hour <= now_hour < end_hour def _get_stats_summary() -> str: """获取今日运营统计摘要""" _reset_daily_stats_if_needed() s = _daily_stats lines = [ f"📊 **今日运营统计** ({s['date']})", f"- 💬 评论: {s['comments']}/{DAILY_LIMITS['comments']}", f"- ❤️ 点赞: {s['likes']}/{DAILY_LIMITS['likes']}", f"- ⭐ 收藏: {s['favorites']}/{DAILY_LIMITS['favorites']}", f"- 🚀 发布: {s['publishes']}/{DAILY_LIMITS['publishes']}", f"- 💌 回复: {s['replies']}/{DAILY_LIMITS['replies']}", f"- ❌ 错误: {s['errors']}", ] return "\n".join(lines) # ================= 人设池 ================= DEFAULT_PERSONAS = [ "身材管理健身美女,热爱分享好身材秘诀和穿搭显身材技巧", "温柔知性的时尚博主,喜欢分享日常穿搭和生活美学", "元气满满的大学生,热爱探店和平价好物分享", "30岁都市白领丽人,专注通勤穿搭和职场干货", "精致妈妈,分享育儿经验和家居收纳技巧", "文艺青年摄影师,喜欢记录旅行和城市角落", "健身达人营养师,专注减脂餐和运动分享", "资深美妆博主,擅长化妆教程和护肤测评", "独居女孩,分享租房改造和独居生活仪式感", "甜品烘焙爱好者,热衷分享自制甜点和下午茶", "数码科技女生,专注好用App和电子产品测评", "小镇姑娘在大城市打拼,分享省钱攻略和成长日记", "中医养生爱好者,分享节气养生和食疗方子", "二次元coser,喜欢分享cos日常和动漫周边", "北漂程序媛,分享高效工作法和解压生活", "复古穿搭博主,热爱vintage风和中古饰品", "考研上岸学姐,分享学习方法和备考经验", "新手养猫人,记录和毛孩子的日常生活", "咖啡重度爱好者,探遍城市独立咖啡馆", "极简主义生活家,倡导断舍离和高质量生活", "汉服爱好者,分享传统文化和国风穿搭", "插画师小姐姐,分享手绘过程和创作灵感", "海归女孩,分享中西文化差异和海外生活见闻", "瑜伽老师,分享身心灵修行和自律生活", "美甲设计师,分享流行甲型和美甲教程", "家居软装设计师,分享小户型改造和氛围感布置", ] RANDOM_PERSONA_LABEL = "🎲 随机人设(每次自动切换)" # ================= 人设 → 分类关键词/主题池映射 ================= # 每个人设对应一组相符的评论关键词和主题,切换人设时自动同步 PERSONA_POOL_MAP = { # ---- 身材管理类 ---- "身材管理健身美女": { "topics": [ "好身材穿搭", "显身材穿搭", "马甲线养成", "翘臀训练", "直角肩养成", "天鹅颈锻炼", "小蛮腰秘诀", "腿型矫正", "体态管理", "维密身材", "居家塑形", "健身穿搭", "运动内衣测评", "蜜桃臀训练", "锁骨养成", "紧身穿搭", "比基尼身材", "纤腰丰臀", "身材对比照", "自律打卡", ], "keywords": [ "身材", "好身材", "马甲线", "翘臀", "直角肩", "天鹅颈", "小蛮腰", "健身女孩", "塑形", "体态", "蜜桃臀", "腰臀比", "紧身", "显身材", "维密", "锁骨", "A4腰", "漫画腿", ], }, # ---- 时尚穿搭类 ---- "温柔知性的时尚博主": { "topics": [ "春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "法式穿搭", "极简穿搭", "氛围感穿搭", "一衣多穿", "秋冬叠穿", "夏日清凉穿搭", "生活美学", "衣橱整理", "配色技巧", "基础款穿搭", "轻熟风穿搭", ], "keywords": [ "穿搭", "ootd", "早春穿搭", "通勤穿搭", "显瘦", "法式穿搭", "极简风", "氛围感", "轻熟风", "高级感穿搭", "配色", ], }, "元气满满的大学生": { "topics": [ "学生党穿搭", "宿舍美食", "平价好物", "校园生活", "学生党护肤", "期末复习", "社团活动", "寝室改造", "奶茶测评", "拍照打卡地", "一人食食谱", "考研经验", "实习经验", "省钱攻略", ], "keywords": [ "学生党", "平价好物", "宿舍", "校园", "奶茶", "探店", "拍照", "省钱", "大学生活", "期末", "开学", "室友", ], }, "30岁都市白领丽人": { "topics": [ "通勤穿搭", "职场干货", "面试技巧", "简历优化", "时间管理", "理财入门", "轻熟风穿搭", "职场妆容", "咖啡探店", "高效工作法", "副业分享", "自律生活", "下班后充电", "职场人际关系", ], "keywords": [ "通勤穿搭", "职场", "面试", "理财", "自律", "高效", "咖啡", "轻熟", "白领", "上班族", "时间管理", "副业", ], }, "精致妈妈": { "topics": [ "育儿经验", "家居收纳", "辅食制作", "亲子游", "母婴好物", "宝宝穿搭", "早教启蒙", "产后恢复", "家常菜做法", "小户型收纳", "家庭教育", "孕期护理", "宝宝辅食", "妈妈穿搭", ], "keywords": [ "育儿", "收纳", "辅食", "母婴", "亲子", "早教", "宝宝", "家居", "待产", "产后", "妈妈", "家常菜", ], }, "文艺青年摄影师": { "topics": [ "旅行攻略", "小众旅行地", "拍照打卡地", "城市citywalk", "古镇旅行", "手机摄影技巧", "胶片摄影", "人像摄影", "风光摄影", "街拍", "咖啡探店", "文艺书店", "展览打卡", "独立书店", ], "keywords": [ "旅行", "摄影", "打卡", "citywalk", "胶片", "拍照", "小众", "展览", "文艺", "街拍", "风光", "人像", ], }, "健身达人营养师": { "topics": [ "减脂餐分享", "居家健身", "帕梅拉跟练", "跑步入门", "体态矫正", "增肌餐", "蛋白质补充", "运动穿搭", "健身房攻略", "马甲线养成", "热量计算", "健康早餐", "运动恢复", "减脂食谱", ], "keywords": [ "减脂", "健身", "减脂餐", "蛋白质", "体态", "马甲线", "帕梅拉", "跑步", "热量", "增肌", "运动", "健康餐", ], }, "资深美妆博主": { "topics": [ "妆容教程", "眼妆教程", "唇妆合集", "底妆测评", "护肤心得", "防晒测评", "学生党平价护肤", "敏感肌护肤", "美白攻略", "成分党护肤", "换季护肤", "早C晚A护肤", "抗老护肤", ], "keywords": [ "护肤", "化妆教程", "眼影", "口红", "底妆", "防晒", "美白", "敏感肌", "成分", "平价", "测评", "粉底", ], }, "独居女孩": { "topics": [ "独居生活", "租房改造", "氛围感房间", "一人食食谱", "好物分享", "香薰推荐", "居家好物", "断舍离", "仪式感生活", "独居安全", "解压方式", "emo急救指南", "桌面布置", "小户型装修", ], "keywords": [ "独居", "租房改造", "好物", "氛围感", "一人食", "仪式感", "解压", "居家", "香薰", "ins风", "房间", "断舍离", ], }, "甜品烘焙爱好者": { "topics": [ "烘焙教程", "0失败甜品", "下午茶推荐", "蛋糕教程", "面包制作", "饼干烘焙", "奶油裱花", "巧克力甜品", "网红甜品", "便当制作", "早餐食谱", "咖啡配甜品", "节日甜品", "低卡甜品", ], "keywords": [ "烘焙", "甜品", "蛋糕", "面包", "下午茶", "曲奇", "裱花", "抹茶", "巧克力", "奶油", "食谱", "烤箱", ], }, "数码科技女生": { "topics": [ "iPad生产力", "手机摄影技巧", "好用App推荐", "电子产品测评", "桌面布置", "数码好物", "耳机测评", "平板学习", "生产力工具", "手机壳推荐", "充电设备", "智能家居", ], "keywords": [ "iPad", "App推荐", "数码", "测评", "手机", "耳机", "桌面", "科技", "电子产品", "平板", "生产力", "充电", ], }, "小镇姑娘在大城市打拼": { "topics": [ "省钱攻略", "成长日记", "平价好物", "租房改造", "副业分享", "理财入门", "独居生活", "面试技巧", "通勤穿搭", "自律生活", "城市生存指南", "女性成长", "攒钱计划", ], "keywords": [ "省钱", "平价", "租房", "副业", "理财", "成长", "自律", "打工", "攒钱", "面试", "独居", "北漂", ], }, "中医养生爱好者": { "topics": [ "节气养生", "食疗方子", "泡脚养生", "体质调理", "艾灸", "中药茶饮", "作息调整", "经络按摩", "养胃食谱", "祛湿方法", "睡眠改善", "女性调理", "养生汤", "二十四节气", ], "keywords": [ "养生", "食疗", "泡脚", "中医", "艾灸", "祛湿", "节气", "体质", "养胃", "经络", "调理", "药膳", ], }, "二次元coser": { "topics": [ "cos日常", "动漫周边", "漫展攻略", "cos化妆教程", "假发造型", "lolita穿搭", "二次元好物", "手办收藏", "动漫推荐", "cos道具制作", "jk穿搭", "谷子收藏", "二次元摄影", ], "keywords": [ "cos", "动漫", "二次元", "漫展", "lolita", "手办", "jk", "假发", "谷子", "周边", "番剧", "coser", ], }, "北漂程序媛": { "topics": [ "高效工作法", "程序员日常", "好用App推荐", "副业分享", "自律生活", "时间管理", "iPad生产力", "解压方式", "通勤穿搭", "理财入门", "独居生活", "技术学习", "面试经验", "桌面布置", ], "keywords": [ "程序员", "高效", "App推荐", "自律", "副业", "iPad", "技术", "工作", "北漂", "面试", "代码", "桌面", ], }, "复古穿搭博主": { "topics": [ "vintage风穿搭", "中古饰品", "复古妆容", "二手vintage", "古着穿搭", "法式穿搭", "复古包包", "跳蚤市场", "旧物改造", "港风穿搭", "文艺穿搭", "配饰搭配", "vintage探店", ], "keywords": [ "vintage", "复古", "中古", "古着", "港风", "法式", "饰品", "二手", "旧物", "跳蚤市场", "复古穿搭", "文艺", ], }, "考研上岸学姐": { "topics": [ "考研经验", "英语学习方法", "书单推荐", "时间管理", "自律生活", "考研择校", "政治复习", "数学刷题", "考研英语", "复试经验", "专业课复习", "考研心态", "背诵技巧", "刷题方法", ], "keywords": [ "考研", "英语学习", "书单", "自律", "学习方法", "上岸", "刷题", "备考", "复习", "笔记", "时间管理", "择校", ], }, "新手养猫人": { "topics": [ "养猫日常", "猫粮测评", "猫咪用品", "新手养宠指南", "猫咪健康", "猫咪行为", "驱虫攻略", "猫砂测评", "猫玩具推荐", "猫咪拍照", "多猫家庭", "领养代替购买", "猫咪绝育", ], "keywords": [ "养猫", "猫粮", "猫咪", "宠物", "猫砂", "驱虫", "铲屎官", "喵喵", "猫玩具", "猫零食", "新手养猫", "猫咪日常", ], }, "咖啡重度爱好者": { "topics": [ "咖啡探店", "手冲咖啡", "咖啡豆推荐", "咖啡器具", "拿铁艺术", "家庭咖啡", "咖啡配甜品", "独立咖啡馆", "冷萃咖啡", "咖啡知识", "意式咖啡", "探店打卡", "咖啡拉花", ], "keywords": [ "咖啡", "手冲", "拿铁", "探店", "咖啡豆", "美式", "咖啡馆", "意式", "冷萃", "拉花", "咖啡器具", "独立咖啡馆", ], }, "极简主义生活家": { "topics": [ "断舍离", "极简生活", "收纳技巧", "高质量生活", "减法生活", "胶囊衣橱", "极简护肤", "环保生活", "数字断舍离", "极简穿搭", "极简房间", "消费降级", "物欲管理", ], "keywords": [ "断舍离", "极简", "收纳", "高质量", "减法", "胶囊衣橱", "简约", "环保", "整理", "少即是多", "极简风", "质感", ], }, "汉服爱好者": { "topics": [ "汉服穿搭", "国风穿搭", "传统文化", "汉服发型", "汉服配饰", "汉服拍照", "古风妆容", "汉服日常", "汉服科普", "形制科普", "古风摄影", "新中式穿搭", "汉服探店", ], "keywords": [ "汉服", "国风", "传统文化", "古风", "新中式", "形制", "发簪", "明制", "宋制", "唐制", "汉服日常", "古风摄影", ], }, "插画师小姐姐": { "topics": [ "手绘教程", "创作灵感", "iPad绘画", "插画分享", "水彩教程", "Procreate技巧", "配色方案", "角色设计", "头像绘制", "手账素材", "接稿经验", "画师日常", "绘画工具推荐", ], "keywords": [ "插画", "手绘", "Procreate", "画画", "iPad绘画", "水彩", "配色", "创作", "画师", "手账", "教程", "素材", ], }, "海归女孩": { "topics": [ "中西文化差异", "海外生活", "留学经验", "英语学习方法", "海归求职", "旅行攻略", "异国美食", "海外好物", "文化冲击", "语言学习", "签证攻略", "海归适应", "国外探店", ], "keywords": [ "留学", "海归", "英语", "海外", "文化差异", "旅行", "异国", "签证", "语言", "出国", "求职", "国外", ], }, "瑜伽老师": { "topics": [ "瑜伽入门", "冥想练习", "体态矫正", "呼吸法", "居家瑜伽", "拉伸教程", "肩颈放松", "瑜伽体式", "自律生活", "身心灵", "瑜伽穿搭", "晨练瑜伽", "睡前瑜伽", ], "keywords": [ "瑜伽", "冥想", "体态", "拉伸", "放松", "呼吸", "柔韧", "健康", "自律", "晨练", "入门", "体式", ], }, "美甲设计师": { "topics": [ "美甲教程", "流行甲型", "美甲合集", "简约美甲", "法式美甲", "手绘美甲", "季节美甲", "显白美甲", "美甲配色", "短甲美甲", "新娘美甲", "美甲工具推荐", "日式美甲", ], "keywords": [ "美甲", "甲型", "法式美甲", "手绘", "显白", "短甲", "指甲", "美甲教程", "配色", "日式美甲", "腮红甲", "猫眼甲", ], }, "家居软装设计师": { "topics": [ "小户型改造", "氛围感布置", "软装搭配", "家居好物", "收纳技巧", "客厅布置", "卧室改造", "灯光设计", "绿植布置", "装修避坑", "北欧风格", "ins风家居", "墙面装饰", ], "keywords": [ "家居", "软装", "改造", "收纳", "氛围感", "小户型", "装修", "灯光", "绿植", "北欧", "ins风", "布置", ], }, } # 为"随机人设"使用的全量池(兼容旧逻辑) DEFAULT_TOPICS = [ # 穿搭类 "春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "小个子穿搭", "学生党穿搭", "韩系穿搭", "日系穿搭", "法式穿搭", "极简穿搭", "国风穿搭", "运动穿搭", "闺蜜穿搭", "梨形身材穿搭", "微胖穿搭", "氛围感穿搭", "一衣多穿", "秋冬叠穿", "夏日清凉穿搭", # 美妆护肤类 "护肤心得", "妆容教程", "学生党平价护肤", "敏感肌护肤", "抗老护肤", "美白攻略", "眼妆教程", "唇妆合集", "底妆测评", "防晒测评", "早C晚A护肤", "成分党护肤", "换季护肤", # 美食类 "减脂餐分享", "一人食食谱", "宿舍美食", "烘焙教程", "家常菜做法", "探店打卡", "咖啡探店", "早餐食谱", "下午茶推荐", "火锅推荐", "奶茶测评", "便当制作", "0失败甜品", # 生活家居类 "好物分享", "平价好物", "居家好物", "收纳技巧", "租房改造", "小户型装修", "氛围感房间", "香薰推荐", "桌面布置", "断舍离", # 旅行出行类 "旅行攻略", "周末去哪玩", "小众旅行地", "拍照打卡地", "露营攻略", "自驾游攻略", "古镇旅行", "海岛度假", "城市citywalk", # 学习成长类 "书单推荐", "自律生活", "时间管理", "考研经验", "英语学习方法", "理财入门", "副业分享", "简历优化", "面试技巧", # 数码科技类 "iPad生产力", "手机摄影技巧", "好用App推荐", "电子产品测评", # 健身运动类 "居家健身", "帕梅拉跟练", "跑步入门", "瑜伽入门", "体态矫正", # 宠物类 "养猫日常", "养狗经验", "宠物好物", "新手养宠指南", # 情感心理类 "独居生活", "emo急救指南", "社恐自救", "女性成长", "情绪管理", ] DEFAULT_STYLES = [ "好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷", "知识科普", "经验分享", "清单合集", "对比测评", "沉浸式体验", ] # 全量评论关键词池(兼容旧逻辑 / 随机人设) DEFAULT_COMMENT_KEYWORDS = [ # 穿搭时尚 "穿搭", "ootd", "早春穿搭", "通勤穿搭", "显瘦", "小个子穿搭", # 美妆护肤 "护肤", "化妆教程", "平价护肤", "防晒", "美白", "眼影", # 美食 "美食", "减脂餐", "探店", "咖啡", "烘焙", "食谱", # 生活好物 "好物推荐", "平价好物", "居家好物", "收纳", "租房改造", # 旅行 "旅行", "攻略", "打卡", "周末去哪玩", "露营", # 学习成长 "自律", "书单", "考研", "英语学习", "副业", # 生活日常 "生活日常", "独居", "vlog", "仪式感", "解压", # 健身 "减脂", "健身", "瑜伽", "体态", # 宠物 "养猫", "养狗", "宠物", ] def _match_persona_pools(persona_text: str) -> dict | None: """根据人设文本模糊匹配对应的关键词池和主题池 返回 {"topics": [...], "keywords": [...]} 或 None(未匹配) """ if not persona_text or persona_text == RANDOM_PERSONA_LABEL: return None # 精确匹配 for key, pools in PERSONA_POOL_MAP.items(): if key in persona_text or persona_text in key: return pools # 关键词模糊匹配 _CATEGORY_HINTS = { "时尚|穿搭|搭配|衣服": "温柔知性的时尚博主", "大学|学生|校园": "元气满满的大学生", "白领|职场|通勤|上班": "30岁都市白领丽人", "妈妈|育儿|宝宝|母婴": "精致妈妈", "摄影|旅行|旅游|文艺": "文艺青年摄影师", "健身|运动|减脂|增肌|营养": "健身达人营养师", "美妆|化妆|护肤|美白": "资深美妆博主", "独居|租房|一人": "独居女孩", "烘焙|甜品|蛋糕|面包": "甜品烘焙爱好者", "数码|科技|App|电子": "数码科技女生", "小镇|打拼|省钱|攒钱": "小镇姑娘在大城市打拼", "中医|养生|食疗|节气": "中医养生爱好者", "二次元|cos|动漫|漫展": "二次元coser", "程序|代码|开发|码农": "北漂程序媛", "复古|vintage|中古|古着": "复古穿搭博主", "考研|备考|上岸|学习方法": "考研上岸学姐", "猫|铲屎|喵": "新手养猫人", "咖啡|手冲|拿铁": "咖啡重度爱好者", "极简|断舍离|简约": "极简主义生活家", "汉服|国风|传统文化": "汉服爱好者", "插画|手绘|画画|绘画": "插画师小姐姐", "海归|留学|海外": "海归女孩", "瑜伽|冥想|身心灵": "瑜伽老师", "美甲|甲型|指甲": "美甲设计师", "家居|软装|装修|改造": "家居软装设计师", } for hints, persona_key in _CATEGORY_HINTS.items(): if any(h in persona_text for h in hints.split("|")): return PERSONA_POOL_MAP.get(persona_key) return None def get_persona_topics(persona_text: str) -> list[str]: """获取人设对应的主题池,未匹配则返回全量池""" pools = _match_persona_pools(persona_text) return pools["topics"] if pools else DEFAULT_TOPICS def get_persona_keywords(persona_text: str) -> list[str]: """获取人设对应的评论关键词池,未匹配则返回全量池""" pools = _match_persona_pools(persona_text) return pools["keywords"] if pools else DEFAULT_COMMENT_KEYWORDS def on_persona_changed(persona_text: str): """人设切换时联动更新评论关键词池和主题池""" keywords = get_persona_keywords(persona_text) topics = get_persona_topics(persona_text) keywords_str = ", ".join(keywords) topics_str = ", ".join(topics) matched = _match_persona_pools(persona_text) if matched: label = persona_text[:15] if len(persona_text) > 15 else persona_text hint = f"✅ 已切换至「{label}」专属关键词/主题池" else: hint = "ℹ️ 使用通用全量关键词/主题池" return keywords_str, topics_str, hint def _auto_log_append(msg: str): """记录自动化日志""" ts = datetime.now().strftime("%H:%M:%S") entry = f"[{ts}] {msg}" _auto_log.append(entry) if len(_auto_log) > 500: _auto_log[:] = _auto_log[-300:] logger.info("[自动化] %s", msg) def _resolve_persona(persona_text: str) -> str: """解析人设:如果是随机人设则从池中随机选一个,否则原样返回""" if not persona_text or persona_text == RANDOM_PERSONA_LABEL: chosen = random.choice(DEFAULT_PERSONAS) _auto_log_append(f"🎭 本次人设: {chosen[:20]}...") return chosen # 检查是否选的是池中某个人设(Dropdown选中) return persona_text def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text): """一键评论 + 同步刷新日志""" msg = auto_comment_once(keywords_str, mcp_url, model, persona_text) return msg, get_auto_log() def auto_comment_once(keywords_str, mcp_url, model, persona_text): """一键评论:自动搜索高赞笔记 → AI生成评论 → 发送(含防重复/限额/冷却)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("comments"): return f"🚫 今日评论已达上限 ({DAILY_LIMITS['comments']})" persona_text = _resolve_persona(persona_text) # 如果用户未手动修改关键词池,则使用人设匹配的专属关键词池 persona_keywords = get_persona_keywords(persona_text) keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else persona_keywords keyword = random.choice(keywords) _auto_log_append(f"🔍 搜索关键词: {keyword}") client = get_mcp_client(mcp_url) # 随机切换搜索排序,丰富互动对象 sort_options = ["最多点赞", "综合", "最新"] sort_by = random.choice(sort_options) # 搜索高赞笔记 entries = client.search_feeds_parsed(keyword, sort_by=sort_by) if not entries: _auto_log_append("⚠️ 搜索无结果,尝试推荐列表") entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" # 过滤掉自己的笔记 & 已评论过的笔记 my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["commented_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已评论过,换个关键词试试" # 从前10个中随机选择 target = random.choice(entries[:min(10, len(entries))]) feed_id = target["feed_id"] xsec_token = target["xsec_token"] title = target.get("title", "未知") _auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')}) [排序:{sort_by}]") if not feed_id or not xsec_token: return "❌ 笔记缺少必要参数 (feed_id/xsec_token)" # 模拟浏览延迟 time.sleep(random.uniform(3, 8)) # 加载笔记详情 result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: _record_error() return f"❌ 加载笔记失败: {result['error']}" full_text = result.get("text", "") if "评论" in full_text: parts = full_text.split("评论", 1) content_part = parts[0].strip()[:600] comments_part = ("评论" + parts[1])[:800] if len(parts) > 1 else "" else: content_part = full_text[:500] comments_part = "" # AI 生成评论 api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" svc = LLMService(api_key, base_url, model) comment = svc.generate_proactive_comment( persona_text, title, content_part, comments_part ) _auto_log_append(f"💬 生成评论: {comment[:60]}...") # 随机等待后发送 time.sleep(random.uniform(3, 10)) result = client.post_comment(feed_id, xsec_token, comment) resp_text = result.get("text", "") _auto_log_append(f"📡 MCP 响应: {resp_text[:200]}") if "error" in result: _record_error() _auto_log_append(f"❌ 评论发送失败: {result['error']}") return f"❌ 评论发送失败: {result['error']}" # 记录成功操作 _op_history["commented_feeds"].add(feed_id) _increment_stat("comments") _clear_error_streak() _auto_log_append(f"✅ 评论已发送到「{title[:20]}」 (今日第{_daily_stats['comments']}条)") return f"✅ 已评论「{title[:25]}」\n📝 评论: {comment}\n📊 今日评论: {_daily_stats['comments']}/{DAILY_LIMITS['comments']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键评论异常: {e}") return f"❌ 评论失败: {e}" def _auto_like_with_log(keywords_str, like_count, mcp_url): """一键点赞 + 同步刷新日志""" msg = auto_like_once(keywords_str, like_count, mcp_url) return msg, get_auto_log() def auto_like_once(keywords_str, like_count, mcp_url): """一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("likes"): return f"🚫 今日点赞已达上限 ({DAILY_LIMITS['likes']})" keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS keyword = random.choice(keywords) like_count = int(like_count) if like_count else 5 # 不超过当日剩余额度 remaining = DAILY_LIMITS["likes"] - _daily_stats.get("likes", 0) like_count = min(like_count, remaining) _auto_log_append(f"👍 点赞关键词: {keyword} | 目标: {like_count} 个") client = get_mcp_client(mcp_url) # 搜索笔记 entries = client.search_feeds_parsed(keyword, sort_by="综合") if not entries: _auto_log_append("⚠️ 搜索无结果,尝试推荐列表") entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" # 过滤自己的笔记 & 已点赞过的 my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["liked_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已点赞过" # 随机打乱,取前 N 个 random.shuffle(entries) targets = entries[:min(like_count, len(entries))] liked = 0 for target in targets: feed_id = target.get("feed_id", "") xsec_token = target.get("xsec_token", "") title = target.get("title", "未知")[:25] if not feed_id or not xsec_token: continue # 模拟浏览延迟 time.sleep(random.uniform(2, 6)) result = client.like_feed(feed_id, xsec_token) if "error" in result: _auto_log_append(f" ❌ 点赞失败「{title}」: {result['error']}") else: liked += 1 _op_history["liked_feeds"].add(feed_id) _increment_stat("likes") _auto_log_append(f" ❤️ 已点赞「{title}」@{target.get('author', '未知')}") if liked > 0: _clear_error_streak() _auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)} (今日累计{_daily_stats.get('likes', 0)})") return f"✅ 点赞完成!成功 {liked}/{len(targets)} 个\n📊 今日点赞: {_daily_stats.get('likes', 0)}/{DAILY_LIMITS['likes']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键点赞异常: {e}") return f"❌ 点赞失败: {e}" def _auto_favorite_with_log(keywords_str, fav_count, mcp_url): """一键收藏 + 同步刷新日志""" msg = auto_favorite_once(keywords_str, fav_count, mcp_url) return msg, get_auto_log() def auto_favorite_once(keywords_str, fav_count, mcp_url): """一键收藏:搜索优质笔记 → 随机选择 → 批量收藏(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("favorites"): return f"🚫 今日收藏已达上限 ({DAILY_LIMITS['favorites']})" keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS keyword = random.choice(keywords) fav_count = int(fav_count) if fav_count else 3 remaining = DAILY_LIMITS["favorites"] - _daily_stats.get("favorites", 0) fav_count = min(fav_count, remaining) _auto_log_append(f"⭐ 收藏关键词: {keyword} | 目标: {fav_count} 个") client = get_mcp_client(mcp_url) entries = client.search_feeds_parsed(keyword, sort_by="最多收藏") if not entries: entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["favorited_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已收藏过" random.shuffle(entries) targets = entries[:min(fav_count, len(entries))] saved = 0 for target in targets: feed_id = target.get("feed_id", "") xsec_token = target.get("xsec_token", "") title = target.get("title", "未知")[:25] if not feed_id or not xsec_token: continue time.sleep(random.uniform(2, 6)) result = client.favorite_feed(feed_id, xsec_token) if "error" in result: _auto_log_append(f" ❌ 收藏失败「{title}」: {result['error']}") else: saved += 1 _op_history["favorited_feeds"].add(feed_id) _increment_stat("favorites") _auto_log_append(f" ⭐ 已收藏「{title}」@{target.get('author', '未知')}") if saved > 0: _clear_error_streak() _auto_log_append(f"⭐ 收藏完成: 成功 {saved}/{len(targets)} (今日累计{_daily_stats.get('favorites', 0)})") return f"✅ 收藏完成!成功 {saved}/{len(targets)} 个\n📊 今日收藏: {_daily_stats.get('favorites', 0)}/{DAILY_LIMITS['favorites']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键收藏异常: {e}") return f"❌ 收藏失败: {e}" def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val, face_swap_on): """一键发布 + 同步刷新日志""" msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text=persona_text, quality_mode_val=quality_mode_val, face_swap_on=face_swap_on) return msg, get_auto_log() def _auto_reply_with_log(max_replies, mcp_url, model, persona_text): """一键回复 + 同步刷新日志""" msg = auto_reply_once(max_replies, mcp_url, model, persona_text) return msg, get_auto_log() def auto_reply_once(max_replies, mcp_url, model, persona_text): """一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("replies"): return f"🚫 今日回复已达上限 ({DAILY_LIMITS['replies']})" persona_text = _resolve_persona(persona_text) my_uid = cfg.get("my_user_id", "") xsec = cfg.get("xsec_token", "") if not my_uid: return "❌ 未配置用户 ID,请到「账号登录」页填写" if not xsec: return "❌ 未获取 xsec_token,请先登录" api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置" max_replies = int(max_replies) if max_replies else 3 remaining = DAILY_LIMITS["replies"] - _daily_stats.get("replies", 0) max_replies = min(max_replies, remaining) client = get_mcp_client(mcp_url) _auto_log_append("💌 开始自动回复评论...") # Step 1: 获取我的笔记列表 result = client.get_user_profile(my_uid, xsec) if "error" in result: _auto_log_append(f"❌ 获取我的笔记失败: {result['error']}") return f"❌ 获取我的笔记失败: {result['error']}" # 解析笔记列表 raw = result.get("raw", {}) text = result.get("text", "") data = None if raw and isinstance(raw, dict): for item in raw.get("content", []): if item.get("type") == "text": try: data = json.loads(item["text"]) except (json.JSONDecodeError, KeyError): pass if not data: try: data = json.loads(text) except (json.JSONDecodeError, TypeError): pass feeds = (data or {}).get("feeds") or [] if not feeds: _auto_log_append("⚠️ 未找到任何笔记") return "⚠️ 未找到你的笔记" # 构建笔记条目 my_entries = [] for f in feeds: nc = f.get("noteCard") or {} my_entries.append({ "feed_id": f.get("id", ""), "xsec_token": f.get("xsecToken", ""), "title": nc.get("displayTitle", "未知标题"), }) _auto_log_append(f"📝 找到 {len(my_entries)} 篇笔记,开始扫描评论...") # Step 2: 遍历笔记,找到未回复的评论 total_replied = 0 svc = LLMService(api_key, base_url, model) for entry in my_entries: if total_replied >= max_replies: break feed_id = entry["feed_id"] xsec_token = entry["xsec_token"] title = entry["title"] if not feed_id or not xsec_token: continue time.sleep(random.uniform(1, 3)) # 加载笔记评论(使用结构化接口) comments = client.get_feed_comments(feed_id, xsec_token, load_all=True) if not comments: continue # 过滤掉自己的评论 & 已回复过的评论 other_comments = [ c for c in comments if c.get("user_id") and c["user_id"] != my_uid and c.get("content") and c.get("comment_id", "") not in _op_history["replied_comments"] ] if not other_comments: continue _auto_log_append(f"📖「{title[:20]}」有 {len(other_comments)} 条他人评论") for comment in other_comments: if total_replied >= max_replies: break comment_id = comment.get("comment_id", "") comment_uid = comment.get("user_id", "") comment_text = comment.get("content", "") nickname = comment.get("nickname", "网友") if not comment_text.strip(): continue _auto_log_append(f" 💬 @{nickname}: {comment_text[:40]}...") # AI 生成回复 try: reply = svc.generate_reply(persona_text, title, comment_text) except Exception as e: _auto_log_append(f" ❌ AI 回复生成失败: {e}") continue _auto_log_append(f" 🤖 回复: {reply[:50]}...") # 发送回复 time.sleep(random.uniform(2, 6)) if comment_id and comment_uid: # 使用 reply_comment 精确回复 resp = client.reply_comment( feed_id, xsec_token, comment_id, comment_uid, reply ) else: # 没有 comment_id 就用 post_comment 发到笔记下 resp = client.post_comment(feed_id, xsec_token, f"@{nickname} {reply}") resp_text = resp.get("text", "") if "error" in resp: _auto_log_append(f" ❌ 回复发送失败: {resp['error']}") else: _auto_log_append(f" ✅ 已回复 @{nickname}") total_replied += 1 if comment_id: _op_history["replied_comments"].add(comment_id) _increment_stat("replies") if total_replied > 0: _clear_error_streak() if total_replied == 0: _auto_log_append("ℹ️ 没有找到需要回复的新评论") return "ℹ️ 没有找到需要回复的新评论\n\n💡 可能所有评论都已回复过" else: _auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} 条 (今日累计{_daily_stats.get('replies', 0)})") return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n📊 今日回复: {_daily_stats.get('replies', 0)}/{DAILY_LIMITS['replies']}" except Exception as e: _record_error() _auto_log_append(f"❌ 自动回复异常: {e}") return f"❌ 自动回复失败: {e}" def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text=None, quality_mode_val=None, face_swap_on=False): """一键发布:自动生成文案 → 生成图片 → 本地备份 → 发布到小红书(含限额 + 智能权重 + 人设 + 画质)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("publishes"): return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})" topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS use_weights = cfg.get("use_smart_weights", True) and analytics.has_weights if use_weights: # 智能加权选题 topic = analytics.get_weighted_topic(topics) style = analytics.get_weighted_style(DEFAULT_STYLES) _auto_log_append(f"🧠 [智能] 主题: {topic} | 风格: {style} (加权选择)") else: topic = random.choice(topics) style = random.choice(DEFAULT_STYLES) _auto_log_append(f"📝 主题: {topic} | 风格: {style} (主题池: {len(topics)} 个)") # 生成文案 api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" svc = LLMService(api_key, base_url, model) # 解析人设(随机/指定) persona = _resolve_persona(persona_text) if persona_text else None if persona: _auto_log_append(f"🎭 人设: {persona[:20]}...") if use_weights: # 使用加权文案生成 (携带权重洞察) weight_insights = f"高权重主题: {', '.join(list(analytics._weights.get('topic_weights', {}).keys())[:5])}\n" weight_insights += f"权重摘要: {analytics.weights_summary}" title_advice = analytics.get_title_advice() hot_tags = ", ".join(analytics.get_top_tags(8)) try: data = svc.generate_weighted_copy(topic, style, weight_insights, title_advice, hot_tags, sd_model_name=sd_model_name, persona=persona) _auto_log_append("🧠 使用智能加权文案模板") except Exception as e: logger.warning("加权文案生成失败, 退回普通模式: %s", e) data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) _auto_log_append("⚠️ 加权模板异常, 使用普通模板") else: data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) title = (data.get("title", "") or "")[:20] content = data.get("content", "") sd_prompt = data.get("sd_prompt", "") tags = data.get("tags", []) # 如果有高权重标签,补充到 tags 中 if use_weights: top_tags = analytics.get_top_tags(5) for t in top_tags: if t not in tags: tags.append(t) tags = tags[:10] # 限制最多10个标签 if not title: _record_error() return "❌ 文案生成失败:无标题" _auto_log_append(f"📄 文案: {title}") # 生成图片 if not sd_url_val or not sd_model_name: return "❌ SD WebUI 未连接或未选择模型,请先在全局设置中连接" sd_svc = SDService(sd_url_val) # 自动发布也支持换脸 face_image = None if face_swap_on: face_image = SDService.load_face_image() if face_image: _auto_log_append("🎭 换脸已启用") else: _auto_log_append("⚠️ 换脸已启用但未找到头像,跳过换脸") images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name, face_image=face_image, quality_mode=quality_mode_val or "快速 (约30秒)") if not images: _record_error() return "❌ 图片生成失败:没有返回图片" _auto_log_append(f"🎨 已生成 {len(images)} 张图片") # 本地备份(同时用于发布) ts = int(time.time()) safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] backup_dir = os.path.join(OUTPUT_DIR, f"{ts}_{safe_title}") os.makedirs(backup_dir, exist_ok=True) # 保存文案 with open(os.path.join(backup_dir, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"标题: {title}\n风格: {style}\n主题: {topic}\n\n{content}\n\n标签: {', '.join(tags)}\n\nSD Prompt: {sd_prompt}") image_paths = [] for idx, img in enumerate(images): if isinstance(img, Image.Image): path = os.path.abspath(os.path.join(backup_dir, f"图{idx+1}.jpg")) if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) image_paths.append(path) if not image_paths: return "❌ 图片保存失败" _auto_log_append(f"💾 本地已备份至: {backup_dir}") # 发布到小红书 client = get_mcp_client(mcp_url) result = client.publish_content( title=title, content=content, images=image_paths, tags=tags ) if "error" in result: _record_error() _auto_log_append(f"❌ 发布失败: {result['error']} (文案已本地保存)") return f"❌ 发布失败: {result['error']}\n💾 文案和图片已备份至: {backup_dir}" _increment_stat("publishes") _clear_error_streak() # 清理 _temp_publish 中的旧临时文件 temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish") try: if os.path.exists(temp_dir): for f in os.listdir(temp_dir): fp = os.path.join(temp_dir, f) if os.path.isfile(fp) and time.time() - os.path.getmtime(fp) > 3600: os.remove(fp) except Exception: pass _auto_log_append(f"🚀 发布成功: {title} (今日第{_daily_stats['publishes']}篇)") return f"✅ 发布成功!\n📌 标题: {title}\n💾 备份: {backup_dir}\n📊 今日发布: {_daily_stats['publishes']}/{DAILY_LIMITS['publishes']}\n{result.get('text', '')}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键发布异常: {e}") return f"❌ 发布失败: {e}" # 调度器下次执行时间追踪 _scheduler_next_times = {} def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enabled, favorite_enabled, comment_min, comment_max, publish_min, publish_max, reply_min, reply_max, max_replies_per_run, like_min, like_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val=None, face_swap_on=False): """后台定时调度循环(含运营时段、冷却、收藏、统计)""" _auto_log_append("🤖 自动化调度器已启动") _auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00") # 首次执行的随机延迟 next_comment = time.time() + random.randint(10, 60) next_publish = time.time() + random.randint(30, 120) next_reply = time.time() + random.randint(15, 90) next_like = time.time() + random.randint(5, 40) next_favorite = time.time() + random.randint(10, 50) def _update_next_display(): """更新下次执行时间显示""" times = {} if comment_enabled: times["评论"] = datetime.fromtimestamp(next_comment).strftime("%H:%M:%S") if like_enabled: times["点赞"] = datetime.fromtimestamp(next_like).strftime("%H:%M:%S") if favorite_enabled: times["收藏"] = datetime.fromtimestamp(next_favorite).strftime("%H:%M:%S") if reply_enabled: times["回复"] = datetime.fromtimestamp(next_reply).strftime("%H:%M:%S") if publish_enabled: times["发布"] = datetime.fromtimestamp(next_publish).strftime("%H:%M:%S") _scheduler_next_times.update(times) _update_next_display() while _auto_running.is_set(): now = time.time() # 检查运营时段 if not _is_in_operating_hours(int(op_start_hour), int(op_end_hour)): now_hour = datetime.now().hour _auto_log_append(f"😴 当前{now_hour}时,不在运营时段({int(op_start_hour)}-{int(op_end_hour)}),休眠中...") # 休眠到运营时间开始 for _ in range(300): # 5分钟检查一次 if not _auto_running.is_set(): break time.sleep(1) continue # 检查错误冷却 if _is_in_cooldown(): remain = int(_error_cooldown_until - time.time()) if remain > 0 and remain % 30 == 0: _auto_log_append(f"⏳ 错误冷却中,剩余 {remain}s") time.sleep(5) continue # 自动评论 if comment_enabled and now >= next_comment: try: _auto_log_append("--- 🔄 执行自动评论 ---") msg = auto_comment_once(keywords, mcp_url, model, persona_text) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动评论异常: {e}") interval = random.randint(int(comment_min) * 60, int(comment_max) * 60) next_comment = time.time() + interval _auto_log_append(f"⏰ 下次评论: {interval // 60} 分钟后") _update_next_display() # 自动点赞 if like_enabled and now >= next_like: try: _auto_log_append("--- 🔄 执行自动点赞 ---") msg = auto_like_once(keywords, like_count_per_run, mcp_url) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动点赞异常: {e}") interval = random.randint(int(like_min) * 60, int(like_max) * 60) next_like = time.time() + interval _auto_log_append(f"⏰ 下次点赞: {interval // 60} 分钟后") _update_next_display() # 自动收藏 if favorite_enabled and now >= next_favorite: try: _auto_log_append("--- 🔄 执行自动收藏 ---") msg = auto_favorite_once(keywords, fav_count_per_run, mcp_url) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动收藏异常: {e}") interval = random.randint(int(fav_min) * 60, int(fav_max) * 60) next_favorite = time.time() + interval _auto_log_append(f"⏰ 下次收藏: {interval // 60} 分钟后") _update_next_display() # 自动发布 if publish_enabled and now >= next_publish: try: _auto_log_append("--- 🔄 执行自动发布 ---") msg = auto_publish_once(topics, mcp_url, sd_url_val, sd_model_name, model, persona_text=persona_text, quality_mode_val=quality_mode_val, face_swap_on=face_swap_on) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动发布异常: {e}") interval = random.randint(int(publish_min) * 60, int(publish_max) * 60) next_publish = time.time() + interval _auto_log_append(f"⏰ 下次发布: {interval // 60} 分钟后") _update_next_display() # 自动回复评论 if reply_enabled and now >= next_reply: try: _auto_log_append("--- 🔄 执行自动回复评论 ---") msg = auto_reply_once(max_replies_per_run, mcp_url, model, persona_text) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动回复异常: {e}") interval = random.randint(int(reply_min) * 60, int(reply_max) * 60) next_reply = time.time() + interval _auto_log_append(f"⏰ 下次回复: {interval // 60} 分钟后") _update_next_display() # 每5秒检查一次停止信号 for _ in range(5): if not _auto_running.is_set(): break time.sleep(1) _scheduler_next_times.clear() _auto_log_append("🛑 自动化调度器已停止") def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on, c_min, c_max, p_min, p_max, r_min, r_max, max_replies_per_run, l_min, l_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val, face_swap_on): """启动定时自动化""" global _auto_thread if _auto_running.is_set(): return "⚠️ 调度器已在运行中,请先停止" if not comment_on and not publish_on and not reply_on and not like_on and not favorite_on: return "❌ 请至少启用一项自动化功能" # 评论/回复需要 LLM,点赞/收藏不需要 if (comment_on or reply_on): api_key, _, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" _auto_running.set() _auto_thread = threading.Thread( target=_scheduler_loop, args=(comment_on, publish_on, reply_on, like_on, favorite_on, c_min, c_max, p_min, p_max, r_min, r_max, max_replies_per_run, l_min, l_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text), kwargs={"quality_mode_val": quality_mode_val, "face_swap_on": face_swap_on}, daemon=True, ) _auto_thread.start() parts = [] if comment_on: parts.append(f"评论({int(c_min)}-{int(c_max)}分)") if like_on: parts.append(f"点赞({int(l_min)}-{int(l_max)}分, {int(like_count_per_run)}个/轮)") if favorite_on: parts.append(f"收藏({int(fav_min)}-{int(fav_max)}分, {int(fav_count_per_run)}个/轮)") if publish_on: parts.append(f"发布({int(p_min)}-{int(p_max)}分)") if reply_on: parts.append(f"回复({int(r_min)}-{int(r_max)}分, ≤{int(max_replies_per_run)}条/轮)") _auto_log_append(f"调度器已启动: {' + '.join(parts)}") return f"✅ 自动化已启动 🟢\n⏰ 运营时段: {int(op_start_hour)}:00-{int(op_end_hour)}:00\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度" def stop_scheduler(): """停止定时自动化""" if not _auto_running.is_set(): return "⚠️ 调度器未在运行" _auto_running.clear() _auto_log_append("⏹️ 收到停止信号,等待当前任务完成...") return "🛑 调度器停止中...当前任务完成后将完全停止" def get_auto_log(): """获取自动化运行日志""" if not _auto_log: return "📋 暂无日志\n\n💡 点击「一键评论」「一键发布」或启动定时后日志将在此显示" return "\n".join(_auto_log[-80:]) def get_scheduler_status(): """获取调度器运行状态 + 下次执行时间 + 今日统计""" _reset_daily_stats_if_needed() if _auto_running.is_set(): lines = ["🟢 **调度器运行中**"] if _scheduler_next_times: next_info = " | ".join(f"{k}@{v}" for k, v in _scheduler_next_times.items()) lines.append(f"⏰ 下次: {next_info}") s = _daily_stats lines.append( f"📊 今日: 💬{s.get('comments',0)} ❤️{s.get('likes',0)} " f"⭐{s.get('favorites',0)} 🚀{s.get('publishes',0)} " f"💌{s.get('replies',0)} ❌{s.get('errors',0)}" ) if _is_in_cooldown(): lines.append(f"⏳ 冷却中,{int(_error_cooldown_until - time.time())}s 后恢复") return "\n".join(lines) return "⚪ **调度器未运行**" # ================================================== # 智能学习 & 笔记分析模块 # ================================================== # 定时学习状态 _learn_running = threading.Event() _learn_thread: threading.Thread | None = None def analytics_collect_data(mcp_url, user_id, xsec_token): """采集笔记表现数据""" if not user_id or not xsec_token: return "❌ 请先填写用户 ID 和 xsec_token (在「账号登录」Tab 获取)" try: client = get_mcp_client(mcp_url) result = analytics.collect_note_performance(client, user_id, xsec_token) if "error" in result: return f"❌ 数据采集失败: {result['error']}" return ( f"✅ 数据采集完成!\n" f"📝 总笔记数: {result['total']}\n" f"🔄 更新: {result['updated']} 篇\n\n" f"💡 点击「计算权重」进行智能学习" ) except Exception as e: logger.error("数据采集失败: %s", e) return f"❌ 采集失败: {e}" def analytics_calculate_weights(): """计算内容权重""" try: result = analytics.calculate_weights() if "error" in result: return "❌ " + result["error"], analytics.generate_report() top = result.get("top_note") top_str = f" | 🏆 最佳: {top['title']} (❤️ {top.get('likes', 0)})" if top else "" msg = ( f"✅ 权重计算完成!\n" f"📊 分析了 {result['total_notes']} 篇笔记{top_str}\n\n" f"💡 权重已自动保存,启用「智能加权发布」后自动生效" ) return msg, analytics.generate_report() except Exception as e: logger.error("权重计算失败: %s", e) return f"❌ 计算失败: {e}", "" def analytics_llm_deep_analysis(model): """LLM 深度分析笔记表现""" note_data = analytics.generate_llm_analysis_prompt() if not note_data: return "❌ 暂无笔记数据,请先采集" try: api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置" svc = LLMService(api_key, base_url, model) result = svc.analyze_note_performance(note_data) lines = ["## 🧠 AI 深度分析报告\n"] if result.get("high_perform_features"): lines.append(f"### ✅ 高表现内容特征\n{result['high_perform_features']}\n") if result.get("low_perform_issues"): lines.append(f"### ⚠️ 低表现内容反思\n{result['low_perform_issues']}\n") if result.get("user_preference"): lines.append(f"### 👤 用户偏好画像\n{result['user_preference']}\n") suggestions = result.get("content_suggestions", []) if suggestions: lines.append("### 📌 内容方向建议") for s in suggestions: priority = "🔴" if s.get("priority", 3) <= 2 else "🟡" if s.get("priority", 3) <= 3 else "🟢" lines.append(f"- {priority} **{s.get('topic', '')}**: {s.get('reason', '')}") lines.append("") templates = result.get("title_templates", []) if templates: lines.append("### ✏️ 标题模板") for t in templates: lines.append(f"- 📝 {t}") lines.append("") tags = result.get("recommended_tags", []) if tags: lines.append(f"### 🏷️ 推荐标签\n{' '.join(f'`#{t}`' for t in tags)}\n") return "\n".join(lines) except Exception as e: logger.error("LLM 分析失败: %s", e) return f"❌ AI 分析失败: {e}" def analytics_get_report(): """获取分析报告""" return analytics.generate_report() def analytics_get_weighted_topics(): """获取加权主题列表""" weighted = analytics.get_weighted_topics_display() if weighted: return weighted return "暂无权重数据,请先执行「采集数据 → 计算权重」" def _learn_scheduler_loop(mcp_url, user_id, xsec_token, model, interval_hours): """定时学习后台循环""" logger.info("定时学习已启动, 间隔 %s 小时", interval_hours) _auto_log_append(f"🧠 定时学习已启动, 每 {interval_hours} 小时自动分析一次") while _learn_running.is_set(): try: # 采集数据 client = get_mcp_client(mcp_url) result = analytics.collect_note_performance(client, user_id, xsec_token) if "error" not in result: _auto_log_append(f"🧠 自动采集完成: {result['total']} 篇笔记, 更新 {result['updated']} 篇") # 计算权重 weight_result = analytics.calculate_weights() if "error" not in weight_result: _auto_log_append(f"🧠 权重更新完成: 分析 {weight_result['total_notes']} 篇") # LLM 深度分析 (如果有配置) api_key, base_url, _ = _get_llm_config() if api_key and model: try: note_data = analytics.generate_llm_analysis_prompt() if note_data: svc = LLMService(api_key, base_url, model) svc.analyze_note_performance(note_data) _auto_log_append("🧠 AI 深度分析完成") except Exception as e: _auto_log_append(f"⚠️ AI 分析失败 (非致命): {e}") else: _auto_log_append(f"⚠️ 自动采集失败: {result.get('error', '未知')}") except Exception as e: _auto_log_append(f"⚠️ 定时学习异常: {e}") # 等待下一次执行 wait_seconds = interval_hours * 3600 for _ in range(int(wait_seconds / 5)): if not _learn_running.is_set(): break time.sleep(5) logger.info("定时学习已停止") _auto_log_append("🧠 定时学习已停止") def start_learn_scheduler(mcp_url, user_id, xsec_token, model, interval_hours): """启动定时学习""" global _learn_thread if _learn_running.is_set(): return "⚠️ 定时学习已在运行中" if not user_id or not xsec_token: return "❌ 请先在「账号登录」获取用户 ID 和 Token" _learn_running.set() _learn_thread = threading.Thread( target=_learn_scheduler_loop, args=(mcp_url, user_id, xsec_token, model, interval_hours), daemon=True, ) _learn_thread.start() return f"✅ 定时学习已启动 🧠 每 {int(interval_hours)} 小时自动分析" def stop_learn_scheduler(): """停止定时学习""" if not _learn_running.is_set(): return "⚠️ 定时学习未在运行" _learn_running.clear() return "🛑 定时学习已停止" # ================================================== # Windows 开机自启管理 # ================================================== _APP_NAME = "XHS_AI_AutoBot" _STARTUP_REG_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run" def _get_startup_script_path() -> str: """获取启动脚本路径(.vbs 静默启动,不弹黑窗)""" return os.path.join(os.path.dirname(os.path.abspath(__file__)), "_autostart.vbs") def _get_startup_bat_path() -> str: """获取启动 bat 路径""" return os.path.join(os.path.dirname(os.path.abspath(__file__)), "_autostart.bat") def _create_startup_scripts(): """创建静默启动脚本(bat + vbs)""" app_dir = os.path.dirname(os.path.abspath(__file__)) venv_python = os.path.join(app_dir, ".venv", "Scripts", "pythonw.exe") # 如果没有 pythonw,退回 python.exe if not os.path.exists(venv_python): venv_python = os.path.join(app_dir, ".venv", "Scripts", "python.exe") main_script = os.path.join(app_dir, "main.py") # 创建 bat bat_path = _get_startup_bat_path() bat_content = f"""@echo off cd /d "{app_dir}" "{venv_python}" "{main_script}" """ with open(bat_path, "w", encoding="utf-8") as f: f.write(bat_content) # 创建 vbs(静默运行 bat,不弹出命令行窗口) vbs_path = _get_startup_script_path() vbs_content = f"""Set WshShell = CreateObject("WScript.Shell") WshShell.Run chr(34) & "{bat_path}" & chr(34), 0 Set WshShell = Nothing """ with open(vbs_path, "w", encoding="utf-8") as f: f.write(vbs_content) return vbs_path def is_autostart_enabled() -> bool: """检查是否已设置开机自启""" if platform.system() != "Windows": return False try: import winreg key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_READ) try: val, _ = winreg.QueryValueEx(key, _APP_NAME) winreg.CloseKey(key) return bool(val) except FileNotFoundError: winreg.CloseKey(key) return False except Exception: return False def enable_autostart() -> str: """启用 Windows 开机自启""" if platform.system() != "Windows": return "❌ 此功能仅支持 Windows 系统" try: import winreg vbs_path = _create_startup_scripts() key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_SET_VALUE) # 用 wscript 运行 vbs 以实现静默启动 winreg.SetValueEx(key, _APP_NAME, 0, winreg.REG_SZ, f'wscript.exe "{vbs_path}"') winreg.CloseKey(key) logger.info(f"开机自启已启用: {vbs_path}") return "✅ 开机自启已启用\n下次开机时将自动后台运行本程序" except Exception as e: logger.error(f"设置开机自启失败: {e}") return f"❌ 设置失败: {e}" def disable_autostart() -> str: """禁用 Windows 开机自启""" if platform.system() != "Windows": return "❌ 此功能仅支持 Windows 系统" try: import winreg key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, _STARTUP_REG_KEY, 0, winreg.KEY_SET_VALUE) try: winreg.DeleteValue(key, _APP_NAME) except FileNotFoundError: pass winreg.CloseKey(key) # 清理启动脚本 for f in [_get_startup_script_path(), _get_startup_bat_path()]: if os.path.exists(f): os.remove(f) logger.info("开机自启已禁用") return "✅ 开机自启已禁用" except Exception as e: logger.error(f"禁用开机自启失败: {e}") return f"❌ 禁用失败: {e}" def toggle_autostart(enabled: bool) -> str: """切换开机自启状态(供 UI 调用)""" if enabled: return enable_autostart() else: return disable_autostart() # ================================================== # UI 构建 # ================================================== config = cfg.all with gr.Blocks( title="小红书 AI 爆文工坊 V2.0", theme=gr.themes.Soft(), css=""" .status-ok { color: #16a34a; font-weight: bold; } .status-err { color: #dc2626; font-weight: bold; } footer { display: none !important; } """, ) as app: gr.Markdown( "# 🍒 小红书 AI 爆文生产工坊 V2.0\n" "> 灵感 → 文案 → 绘图 → 发布 → 运营,一站式全闭环" ) # 全局状态 state_images = gr.State([]) state_search_result = gr.State("") # ============ 全局设置栏 ============ with gr.Accordion("⚙️ 全局设置 (自动保存)", open=False): gr.Markdown("#### 🤖 LLM 提供商 (支持所有 OpenAI 兼容接口)") with gr.Row(): llm_provider = gr.Dropdown( label="选择 LLM 提供商", choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", ""), interactive=True, scale=2, ) btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm", scale=1) with gr.Row(): llm_model = gr.Dropdown( label="LLM 模型", value=config["model"], allow_custom_value=True, interactive=True, scale=2, ) llm_provider_info = gr.Markdown( value="*选择提供商后显示详情*", ) with gr.Accordion("➕ 添加 / 管理 LLM 提供商", open=False): with gr.Row(): new_provider_name = gr.Textbox( label="名称", placeholder="如: DeepSeek / GPT-4o / 通义千问", scale=1, ) new_provider_key = gr.Textbox( label="API Key", type="password", scale=2, ) new_provider_url = gr.Textbox( label="Base URL", placeholder="https://api.openai.com/v1", value="https://api.openai.com/v1", scale=2, ) with gr.Row(): btn_add_provider = gr.Button("✅ 添加提供商", variant="primary", size="sm") btn_del_provider = gr.Button("🗑️ 删除当前提供商", variant="stop", size="sm") provider_mgmt_status = gr.Markdown("") gr.Markdown("---") with gr.Row(): mcp_url = gr.Textbox( label="MCP Server URL", value=config["mcp_url"], scale=2, ) sd_url = gr.Textbox( label="SD WebUI URL", value=config["sd_url"], scale=2, ) with gr.Row(): persona = gr.Dropdown( label="博主人设(评论/回复/自动运营通用)", choices=[RANDOM_PERSONA_LABEL] + DEFAULT_PERSONAS, value=config.get("persona", RANDOM_PERSONA_LABEL), allow_custom_value=True, interactive=True, scale=5, ) with gr.Row(): btn_connect_sd = gr.Button("🎨 连接 SD", size="sm") btn_check_mcp = gr.Button("📡 检查 MCP", size="sm") with gr.Row(): sd_model = gr.Dropdown( label="SD 模型", allow_custom_value=True, interactive=True, scale=2, ) sd_model_info = gr.Markdown("选择模型后显示适配信息", elem_id="sd_model_info") status_bar = gr.Markdown("🔄 等待连接...") gr.Markdown("---") gr.Markdown("#### 🎭 AI 换脸 (ReActor)") gr.Markdown( "> 上传你的头像,生成含人物的图片时自动替换为你的脸\n" "> 需要 SD WebUI 已安装 [ReActor](https://github.com/Gourieff/sd-webui-reactor) 扩展" ) with gr.Row(): face_image_input = gr.Image( label="上传头像 (正面清晰照片效果最佳)", type="pil", height=180, scale=1, ) face_image_preview = gr.Image( label="当前头像", type="pil", height=180, interactive=False, value=SDService.load_face_image(), scale=1, ) with gr.Row(): btn_save_face = gr.Button("💾 保存头像", variant="primary", size="sm") face_swap_toggle = gr.Checkbox( label="🎭 生成图片时启用 AI 换脸", value=os.path.isfile(FACE_IMAGE_PATH), interactive=True, ) face_status = gr.Markdown( "✅ 头像已就绪" if os.path.isfile(FACE_IMAGE_PATH) else "ℹ️ 尚未设置头像" ) gr.Markdown("---") gr.Markdown("#### 🖥️ 系统设置") with gr.Row(): autostart_toggle = gr.Checkbox( label="🚀 Windows 开机自启(静默后台运行)", value=is_autostart_enabled(), interactive=(platform.system() == "Windows"), ) autostart_status = gr.Markdown( value="✅ 已启用" if is_autostart_enabled() else "⚪ 未启用", ) # ============ Tab 页面 ============ with gr.Tabs(): # -------- Tab 1: 内容创作 -------- with gr.Tab("✨ 内容创作"): with gr.Row(): # 左栏:输入 with gr.Column(scale=1): gr.Markdown("### 💡 构思") topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭") style = gr.Dropdown( DEFAULT_STYLES, label="风格", value="好物种草", ) btn_gen_copy = gr.Button("✨ 第一步:生成文案", variant="primary") gr.Markdown("---") gr.Markdown("### 🎨 绘图参数") quality_mode = gr.Radio( SD_PRESET_NAMES, label="生成模式", value="标准 (约1分钟)", info="快速≈30s 标准≈1min 精细≈2-3min (SDXL)", ) with gr.Accordion("高级设置 (覆盖预设)", open=False): neg_prompt = gr.Textbox( label="反向提示词", value=DEFAULT_NEGATIVE, lines=2, ) steps = gr.Slider(8, 50, value=20, step=1, label="步数") cfg_scale = gr.Slider(1, 15, value=5.5, step=0.5, label="CFG Scale") btn_gen_img = gr.Button("🎨 第二步:生成图片", variant="primary") # 中栏:文案编辑 with gr.Column(scale=1): gr.Markdown("### 📝 文案编辑") res_title = gr.Textbox(label="标题 (≤20字)", interactive=True) res_content = gr.TextArea( label="正文 (可手动修改)", lines=12, interactive=True, ) res_prompt = gr.TextArea( label="绘图提示词", lines=3, interactive=True, ) res_tags = gr.Textbox( label="话题标签 (逗号分隔)", interactive=True, placeholder="穿搭, 春季, 好物种草", ) # 右栏:预览 & 发布 with gr.Column(scale=1): gr.Markdown("### 🖼️ 视觉预览") gallery = gr.Gallery(label="AI 生成图片", columns=2, height=300) local_images = gr.File( label="📁 上传本地图片(可混排)", file_count="multiple", file_types=["image"], ) gr.Markdown("### 🚀 发布") schedule_time = gr.Textbox( label="定时发布 (可选, ISO8601格式)", placeholder="如 2026-02-08T18:00:00+08:00,留空=立即发布", ) with gr.Row(): btn_export = gr.Button("📂 导出本地", variant="secondary") btn_publish = gr.Button("🚀 发布到小红书", variant="primary") publish_msg = gr.Markdown("") # -------- Tab 2: 热点探测 -------- with gr.Tab("🔥 热点探测"): gr.Markdown("### 搜索热门内容 → AI 分析趋势 → 一键借鉴创作") with gr.Row(): with gr.Column(scale=1): hot_keyword = gr.Textbox( label="搜索关键词", placeholder="如:春季穿搭", ) hot_sort = gr.Dropdown( ["综合", "最新", "最多点赞", "最多评论", "最多收藏"], label="排序", value="综合", ) btn_search = gr.Button("🔍 搜索", variant="primary") search_status = gr.Markdown("") with gr.Column(scale=2): search_output = gr.TextArea( label="搜索结果", lines=12, interactive=False, ) with gr.Row(): btn_analyze = gr.Button("🧠 AI 分析热点趋势", variant="primary") analysis_status = gr.Markdown("") analysis_output = gr.Markdown(label="分析报告") topic_from_hot = gr.Textbox( label="选择/输入创作选题", placeholder="基于分析选一个方向", ) with gr.Row(): hot_style = gr.Dropdown( ["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"], label="风格", value="好物种草", ) btn_gen_from_hot = gr.Button("✨ 基于热点生成文案", variant="primary") with gr.Row(): hot_title = gr.Textbox(label="生成的标题", interactive=True) hot_content = gr.TextArea(label="生成的正文", lines=8, interactive=True) with gr.Row(): hot_prompt = gr.TextArea(label="绘图提示词", lines=3, interactive=True) hot_tags = gr.Textbox(label="标签", interactive=True) hot_gen_status = gr.Markdown("") btn_sync_to_create = gr.Button( "📋 同步到「内容创作」Tab → 绘图 & 发布", variant="primary", ) # -------- Tab 3: 评论管家 -------- with gr.Tab("💬 评论管家"): gr.Markdown("### 智能评论管理:主动评论引流 & 自动回复粉丝") with gr.Tabs(): # ======== 子 Tab A: 主动评论他人 ======== with gr.Tab("✍️ 主动评论引流"): gr.Markdown( "> **流程**:搜索/浏览笔记 → 选择目标 → 加载内容 → " "AI 分析笔记+已有评论自动生成高质量评论 → 一键发送" ) # 笔记选择器 with gr.Row(): pro_keyword = gr.Textbox( label="🔍 搜索关键词 (留空则获取推荐)", placeholder="穿搭、美食、旅行…", ) btn_pro_fetch = gr.Button("🔍 获取笔记", variant="primary") with gr.Row(): pro_selector = gr.Dropdown( label="📋 选择目标笔记", choices=[], interactive=True, ) pro_fetch_status = gr.Markdown("") # 隐藏字段 with gr.Row(): pro_feed_id = gr.Textbox(label="笔记 ID", interactive=False) pro_xsec_token = gr.Textbox(label="xsec_token", interactive=False) pro_title = gr.Textbox(label="标题", interactive=False) # 加载内容 & AI 分析 btn_pro_load = gr.Button("📖 加载笔记内容", variant="secondary") pro_load_status = gr.Markdown("") with gr.Row(): with gr.Column(scale=1): pro_content = gr.TextArea( label="📄 笔记正文摘要", lines=8, interactive=False, ) with gr.Column(scale=1): pro_comments = gr.TextArea( label="💬 已有评论", lines=8, interactive=False, ) # 隐藏: 完整文本 pro_full_text = gr.Textbox(visible=False) gr.Markdown("---") with gr.Row(): with gr.Column(scale=1): btn_pro_ai = gr.Button( "🤖 AI 智能生成评论", variant="primary", size="lg", ) pro_ai_status = gr.Markdown("") with gr.Column(scale=2): pro_comment_text = gr.TextArea( label="✏️ 评论内容 (可手动修改)", lines=3, interactive=True, placeholder="点击左侧按钮自动生成,也可手动编写", ) with gr.Row(): btn_pro_send = gr.Button("📩 发送评论", variant="primary") pro_send_status = gr.Markdown("") # ======== 子 Tab B: 回复我的评论 ======== with gr.Tab("💌 回复粉丝评论"): gr.Markdown( "> **流程**:选择我的笔记 → 加载评论 → " "粘贴要回复的评论 → AI 生成回复 → 一键发送" ) # 笔记选择器 (自动用已保存的 userId 获取) with gr.Row(): btn_my_fetch = gr.Button("🔍 获取我的笔记", variant="primary") with gr.Row(): my_selector = gr.Dropdown( label="📋 选择我的笔记", choices=[], interactive=True, ) my_fetch_status = gr.Markdown("") with gr.Row(): my_feed_id = gr.Textbox(label="笔记 ID", interactive=False) my_xsec_token = gr.Textbox(label="xsec_token", interactive=False) my_title = gr.Textbox(label="笔记标题", interactive=False) btn_my_load_comments = gr.Button("📥 加载评论", variant="primary") my_comment_status = gr.Markdown("") my_comments_display = gr.TextArea( label="📋 粉丝评论列表", lines=12, interactive=False, ) gr.Markdown("---") gr.Markdown("#### 📝 回复评论") with gr.Row(): with gr.Column(scale=1): my_target_comment = gr.TextArea( label="要回复的评论内容", lines=3, placeholder="从上方评论列表中复制粘贴要回复的评论", ) btn_my_ai_reply = gr.Button( "🤖 AI 生成回复", variant="secondary", ) my_reply_gen_status = gr.Markdown("") with gr.Column(scale=1): my_reply_content = gr.TextArea( label="回复内容 (可修改)", lines=3, interactive=True, ) btn_my_send_reply = gr.Button( "📩 发送回复", variant="primary", ) my_reply_status = gr.Markdown("") # -------- Tab 4: 账号登录 -------- with gr.Tab("🔐 账号登录"): gr.Markdown( "### 小红书账号登录\n" "> 扫码登录后自动获取 xsec_token,配合用户 ID 即可使用所有功能" ) with gr.Row(): with gr.Column(scale=1): gr.Markdown( "**操作步骤:**\n" "1. 确保 MCP 服务已启动\n" "2. 点击「获取登录二维码」→ 用小红书 App 扫码\n" "3. 点击「检查登录状态」→ 自动获取并保存 xsec_token\n" "4. 首次使用请填写你的用户 ID 并点击保存\n\n" "⚠️ 登录后不要在其他网页端登录同一账号,否则会被踢出" ) btn_get_qrcode = gr.Button( "📱 获取登录二维码", variant="primary", size="lg", ) btn_check_login = gr.Button( "🔍 检查登录状态 (自动获取 Token)", variant="secondary", size="lg", ) btn_logout = gr.Button( "🚪 退出登录 (重新扫码)", variant="stop", size="lg", ) login_status = gr.Markdown("🔄 等待操作...") gr.Markdown("---") gr.Markdown( "#### 📌 我的账号信息\n" "> **注意**: 小红书号 ≠ 用户 ID\n" "> - **小红书号 (redId)**: 如 `18688457507`,是你在 App 个人页看到的\n" "> - **用户 ID (userId)**: 如 `5a695db6e8ac2b72e8af2a53`,24位十六进制字符串\n\n" "💡 **如何获取 userId?**\n" "1. 用浏览器打开你的小红书主页\n" "2. 网址格式为: `xiaohongshu.com/user/profile/xxxxxxxx`\n" "3. `profile/` 后面的就是你的 userId" ) login_user_id = gr.Textbox( label="我的用户 ID (24位 userId, 非小红书号)", value=config.get("my_user_id", ""), placeholder="如: 5a695db6e8ac2b72e8af2a53", ) login_xsec_token = gr.Textbox( label="xsec_token (登录后自动获取)", value=config.get("xsec_token", ""), interactive=False, ) btn_save_uid = gr.Button( "💾 保存用户 ID", variant="secondary", ) save_uid_status = gr.Markdown("") with gr.Column(scale=1): qr_image = gr.Image( label="扫码登录", height=350, width=350, ) # -------- Tab 5: 数据看板 -------- with gr.Tab("📊 数据看板"): gr.Markdown( "### 我的账号数据看板\n" "> 用户 ID 和 xsec_token 从「账号登录」自动获取,直接点击加载即可" ) with gr.Row(): with gr.Column(scale=1): data_user_id = gr.Textbox( label="我的用户 ID (自动填充)", value=config.get("my_user_id", ""), interactive=True, ) data_xsec_token = gr.Textbox( label="xsec_token (自动填充)", value=config.get("xsec_token", ""), interactive=True, ) btn_refresh_token = gr.Button( "🔄 刷新 Token", variant="secondary", ) btn_load_my_data = gr.Button( "📊 加载我的数据", variant="primary", size="lg", ) data_status = gr.Markdown("") with gr.Column(scale=2): profile_card = gr.Markdown( value="*等待加载...*", label="账号概览", ) gr.Markdown("---") gr.Markdown("### 📈 数据可视化") with gr.Row(): with gr.Column(scale=1): chart_interact = gr.Plot(label="📊 核心指标") with gr.Column(scale=2): chart_notes = gr.Plot(label="❤ 笔记点赞排行") gr.Markdown("---") notes_detail = gr.Markdown( value="*加载数据后显示笔记明细表格*", label="笔记数据明细", ) # -------- Tab 6: 智能学习 -------- with gr.Tab("🧠 智能学习"): gr.Markdown( "### 🧠 智能内容学习引擎\n" "> 自动分析已发布笔记的表现,学习哪些内容受欢迎,用权重指导未来创作\n\n" "**工作流程**: 采集数据 → 计算权重 → AI 深度分析 → 自动优化创作\n\n" "💡 启用后,自动发布将优先生成高权重主题的内容" ) with gr.Row(): # 左栏: 数据采集 & 权重计算 with gr.Column(scale=1): gr.Markdown("#### 📊 数据采集") learn_user_id = gr.Textbox( label="用户 ID", value=config.get("my_user_id", ""), interactive=True, ) learn_xsec_token = gr.Textbox( label="xsec_token", value=config.get("xsec_token", ""), interactive=True, ) btn_learn_collect = gr.Button( "📊 采集笔记数据", variant="primary", size="lg", ) learn_collect_status = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### ⚖️ 权重计算") btn_learn_calc = gr.Button( "⚖️ 计算内容权重", variant="primary", size="lg", ) learn_calc_status = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### 🤖 AI 深度分析") gr.Markdown("> 用 LLM 分析笔记数据,找出内容规律,生成策略建议") btn_learn_ai = gr.Button( "🧠 AI 深度分析", variant="primary", size="lg", ) gr.Markdown("---") gr.Markdown("#### ⏰ 定时自动学习") gr.Markdown("> 每隔 N 小时自动采集数据 + 计算权重 + AI 分析") learn_interval = gr.Number( label="学习间隔 (小时)", value=6, minimum=1, maximum=48, ) with gr.Row(): btn_learn_start = gr.Button( "▶ 启动定时学习", variant="primary", size="sm", ) btn_learn_stop = gr.Button( "⏹ 停止", variant="stop", size="sm", ) learn_sched_status = gr.Markdown("⚪ 定时学习未启动") gr.Markdown("---") gr.Markdown("#### 🎯 加权主题预览") gr.Markdown("> 当前权重最高的主题 (自动发布会优先选择)") btn_show_topics = gr.Button("🔄 刷新加权主题", size="sm") learn_weighted_topics = gr.Textbox( label="加权主题池 (权重从高到低)", value=analytics.get_weighted_topics_display() or "暂无权重数据", interactive=False, lines=2, ) learn_use_weights = gr.Checkbox( label="🧠 自动发布时使用智能权重 (推荐)", value=cfg.get("use_smart_weights", True), interactive=True, ) # 右栏: 分析报告 with gr.Column(scale=2): gr.Markdown("#### 📋 智能学习报告") learn_report = gr.Markdown( value=analytics.generate_report(), label="分析报告", ) gr.Markdown("---") learn_ai_report = gr.Markdown( value="*点击「AI 深度分析」生成*", label="AI 深度分析报告", ) # -------- Tab 7: 自动运营 -------- with gr.Tab("🤖 自动运营"): gr.Markdown( "### 🤖 无人值守自动化运营\n" "> 一键评论引流 + 一键点赞 + 一键收藏 + 一键回复 + 一键发布 + 随机定时全自动\n\n" "⚠️ **注意**: 请确保已连接 LLM、SD WebUI 和 MCP 服务" ) persona_pool_hint = gr.Markdown( value=f"🎭 当前人设池: **{config.get('persona', '随机')[:20]}** → 关键词/主题池已匹配", ) with gr.Row(): # 左栏: 一键操作 with gr.Column(scale=1): gr.Markdown("#### 💬 一键智能评论") gr.Markdown( "> 自动搜索高赞笔记 → AI 分析内容 → 生成评论 → 发送\n" "每次随机选关键词搜索,从结果中随机选笔记" ) auto_comment_keywords = gr.Textbox( label="评论关键词池 (逗号分隔,随人设自动切换)", value=", ".join(get_persona_keywords(config.get("persona", ""))), placeholder="关键词1, 关键词2, ... (切换人设自动更新)", ) btn_auto_comment = gr.Button( "💬 一键评论 (单次)", variant="primary", size="lg", ) auto_comment_result = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### 👍 一键自动点赞") gr.Markdown( "> 搜索笔记 → 随机选择多篇 → 依次点赞\n" "提升账号活跃度,无需 LLM" ) auto_like_count = gr.Number( label="单次点赞数量", value=5, minimum=1, maximum=20, ) btn_auto_like = gr.Button( "👍 一键点赞 (单次)", variant="primary", size="lg", ) auto_like_result = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### ⭐ 一键自动收藏") gr.Markdown( "> 搜索笔记 → 随机选择多篇 → 依次收藏\n" "提升账号活跃度,与点赞互补" ) auto_fav_count = gr.Number( label="单次收藏数量", value=3, minimum=1, maximum=15, ) btn_auto_favorite = gr.Button( "⭐ 一键收藏 (单次)", variant="primary", size="lg", ) auto_favorite_result = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### 💌 一键自动回复") gr.Markdown( "> 扫描我的所有笔记 → 找到粉丝评论 → AI 生成回复 → 逐条发送\n" "自动跳过自己的评论,模拟真人间隔回复" ) auto_reply_max = gr.Number( label="单次最多回复条数", value=5, minimum=1, maximum=20, ) btn_auto_reply = gr.Button( "💌 一键回复 (单次)", variant="primary", size="lg", ) auto_reply_result = gr.Markdown("") gr.Markdown("---") gr.Markdown("#### 🚀 一键智能发布") gr.Markdown( "> 随机选主题+风格 → AI 生成文案 → SD 生成图片 → 自动发布" ) auto_publish_topics = gr.Textbox( label="主题池 (逗号分隔,随人设自动切换)", value=", ".join(get_persona_topics(config.get("persona", ""))), placeholder="主题会从池中随机选取,切换人设自动更新", ) btn_auto_publish = gr.Button( "🚀 一键发布 (单次)", variant="primary", size="lg", ) auto_publish_result = gr.Markdown("") # 右栏: 定时自动化 with gr.Column(scale=1): gr.Markdown("#### ⏰ 随机定时自动化") gr.Markdown( "> 设置时间间隔后启动,系统将在随机时间自动执行\n" "> 模拟真人操作节奏,降低被检测风险" ) sched_status = gr.Markdown("⚪ **调度器未运行**") # 运营时段设置 with gr.Group(): gr.Markdown("##### ⏰ 运营时段") with gr.Row(): sched_start_hour = gr.Number( label="开始时间(整点)", value=8, minimum=0, maximum=23, ) sched_end_hour = gr.Number( label="结束时间(整点)", value=23, minimum=1, maximum=24, ) with gr.Group(): sched_comment_on = gr.Checkbox( label="✅ 启用自动评论", value=True, ) with gr.Row(): sched_c_min = gr.Number( label="评论最小间隔(分钟)", value=15, minimum=5, ) sched_c_max = gr.Number( label="评论最大间隔(分钟)", value=45, minimum=10, ) with gr.Group(): sched_like_on = gr.Checkbox( label="✅ 启用自动点赞", value=True, ) with gr.Row(): sched_l_min = gr.Number( label="点赞最小间隔(分钟)", value=10, minimum=3, ) sched_l_max = gr.Number( label="点赞最大间隔(分钟)", value=30, minimum=5, ) sched_like_count = gr.Number( label="每轮点赞数量", value=5, minimum=1, maximum=15, ) with gr.Group(): sched_fav_on = gr.Checkbox( label="✅ 启用自动收藏", value=True, ) with gr.Row(): sched_fav_min = gr.Number( label="收藏最小间隔(分钟)", value=12, minimum=3, ) sched_fav_max = gr.Number( label="收藏最大间隔(分钟)", value=35, minimum=5, ) sched_fav_count = gr.Number( label="每轮收藏数量", value=3, minimum=1, maximum=10, ) with gr.Group(): sched_reply_on = gr.Checkbox( label="✅ 启用自动回复评论", value=True, ) with gr.Row(): sched_r_min = gr.Number( label="回复最小间隔(分钟)", value=20, minimum=5, ) sched_r_max = gr.Number( label="回复最大间隔(分钟)", value=60, minimum=10, ) sched_reply_max = gr.Number( label="每轮最多回复条数", value=3, minimum=1, maximum=10, ) with gr.Group(): sched_publish_on = gr.Checkbox( label="✅ 启用自动发布", value=True, ) with gr.Row(): sched_p_min = gr.Number( label="发布最小间隔(分钟)", value=60, minimum=30, ) sched_p_max = gr.Number( label="发布最大间隔(分钟)", value=180, minimum=60, ) with gr.Row(): btn_start_sched = gr.Button( "▶️ 启动定时", variant="primary", size="lg", ) btn_stop_sched = gr.Button( "⏹️ 停止定时", variant="stop", size="lg", ) sched_result = gr.Markdown("") gr.Markdown("---") with gr.Row(): with gr.Column(scale=2): gr.Markdown("#### 📋 运行日志") with gr.Row(): btn_refresh_log = gr.Button("🔄 刷新日志", size="sm") btn_clear_log = gr.Button("🗑️ 清空日志", size="sm") btn_refresh_stats = gr.Button("📊 刷新统计", size="sm") auto_log_display = gr.TextArea( label="自动化运行日志", value="📋 暂无日志\n\n💡 执行操作后日志将在此显示", lines=15, interactive=False, ) with gr.Column(scale=1): gr.Markdown("#### 📊 今日运营统计") auto_stats_display = gr.Markdown( value=_get_stats_summary(), ) # ================================================== # 事件绑定 # ================================================== # ---- 全局设置: LLM 提供商管理 ---- btn_connect_llm.click( fn=connect_llm, inputs=[llm_provider], outputs=[llm_model, status_bar], ) llm_provider.change( fn=on_provider_selected, inputs=[llm_provider], outputs=[llm_provider_info], ) btn_add_provider.click( fn=add_llm_provider, inputs=[new_provider_name, new_provider_key, new_provider_url], outputs=[llm_provider, provider_mgmt_status], ) btn_del_provider.click( fn=remove_llm_provider, inputs=[llm_provider], outputs=[llm_provider, provider_mgmt_status], ) btn_connect_sd.click( fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar, sd_model_info], ) sd_model.change( fn=on_sd_model_change, inputs=[sd_model], outputs=[sd_model_info], ) btn_check_mcp.click( fn=check_mcp_status, inputs=[mcp_url], outputs=[status_bar], ) # ---- 头像/换脸管理 ---- btn_save_face.click( fn=upload_face_image, inputs=[face_image_input], outputs=[face_image_preview, face_status], ) # ---- Tab 1: 内容创作 ---- btn_gen_copy.click( fn=generate_copy, inputs=[llm_model, topic, style, sd_model, persona], outputs=[res_title, res_content, res_prompt, res_tags, status_bar], ) # 生成模式切换 → 同步更新步数/CFG预览 def on_quality_mode_change(mode, sd_model_val): p = get_sd_preset(mode, sd_model_val) return p["steps"], p["cfg_scale"] quality_mode.change( fn=on_quality_mode_change, inputs=[quality_mode, sd_model], outputs=[steps, cfg_scale], ) btn_gen_img.click( fn=generate_images, inputs=[sd_url, res_prompt, neg_prompt, sd_model, steps, cfg_scale, face_swap_toggle, face_image_preview, quality_mode], outputs=[gallery, state_images, status_bar], ) btn_export.click( fn=one_click_export, inputs=[res_title, res_content, state_images], outputs=[publish_msg], ) btn_publish.click( fn=publish_to_xhs, inputs=[res_title, res_content, res_tags, state_images, local_images, mcp_url, schedule_time], outputs=[publish_msg], ) # ---- Tab 2: 热点探测 ---- btn_search.click( fn=search_hotspots, inputs=[hot_keyword, hot_sort, mcp_url], outputs=[search_status, search_output], ) # 搜索结果同步到 state search_output.change( fn=lambda x: x, inputs=[search_output], outputs=[state_search_result], ) btn_analyze.click( fn=analyze_and_suggest, inputs=[llm_model, hot_keyword, search_output], outputs=[analysis_status, analysis_output, topic_from_hot], ) btn_gen_from_hot.click( fn=generate_from_hotspot, inputs=[llm_model, topic_from_hot, hot_style, search_output, sd_model, persona], outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status], ) # 同步热点文案到内容创作 Tab btn_sync_to_create.click( fn=lambda t, c, p, tg: (t, c, p, tg, "✅ 已同步到「内容创作」,可切换 Tab 继续绘图和发布"), inputs=[hot_title, hot_content, hot_prompt, hot_tags], outputs=[res_title, res_content, res_prompt, res_tags, status_bar], ) # ---- Tab 3: 评论管家 ---- # == 子 Tab A: 主动评论引流 == btn_pro_fetch.click( fn=fetch_proactive_notes, inputs=[pro_keyword, mcp_url], outputs=[pro_selector, pro_fetch_status], ) pro_selector.change( fn=on_proactive_note_selected, inputs=[pro_selector], outputs=[pro_feed_id, pro_xsec_token, pro_title], ) btn_pro_load.click( fn=load_note_for_comment, inputs=[pro_feed_id, pro_xsec_token, mcp_url], outputs=[pro_load_status, pro_content, pro_comments, pro_full_text], ) btn_pro_ai.click( fn=ai_generate_comment, inputs=[llm_model, persona, pro_title, pro_content, pro_comments], outputs=[pro_comment_text, pro_ai_status], ) btn_pro_send.click( fn=send_comment, inputs=[pro_feed_id, pro_xsec_token, pro_comment_text, mcp_url], outputs=[pro_send_status], ) # == 子 Tab B: 回复粉丝评论 == btn_my_fetch.click( fn=fetch_my_notes, inputs=[mcp_url], outputs=[my_selector, my_fetch_status], ) my_selector.change( fn=on_my_note_selected, inputs=[my_selector], outputs=[my_feed_id, my_xsec_token, my_title], ) btn_my_load_comments.click( fn=fetch_my_note_comments, inputs=[my_feed_id, my_xsec_token, mcp_url], outputs=[my_comment_status, my_comments_display], ) btn_my_ai_reply.click( fn=ai_reply_comment, inputs=[llm_model, persona, my_title, my_target_comment], outputs=[my_reply_content, my_reply_gen_status], ) btn_my_send_reply.click( fn=send_reply, inputs=[my_feed_id, my_xsec_token, my_reply_content, mcp_url], outputs=[my_reply_status], ) # ---- Tab 4: 账号登录 ---- btn_get_qrcode.click( fn=get_login_qrcode, inputs=[mcp_url], outputs=[qr_image, login_status], ) btn_check_login.click( fn=check_login, inputs=[mcp_url], outputs=[login_status, login_user_id, login_xsec_token], ) btn_logout.click( fn=logout_xhs, inputs=[mcp_url], outputs=[login_status], ) btn_save_uid.click( fn=save_my_user_id, inputs=[login_user_id], outputs=[save_uid_status], ) # ---- Tab 5: 数据看板 ---- def refresh_xsec_token(mcp_url): token = _auto_fetch_xsec_token(mcp_url) if token: cfg.set("xsec_token", token) return gr.update(value=token), "✅ Token 已刷新" return gr.update(value=cfg.get("xsec_token", "")), "❌ 刷新失败,请确认已登录" btn_refresh_token.click( fn=refresh_xsec_token, inputs=[mcp_url], outputs=[data_xsec_token, data_status], ) btn_load_my_data.click( fn=fetch_my_profile, inputs=[data_user_id, data_xsec_token, mcp_url], outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail], ) # ---- Tab 6: 智能学习 ---- btn_learn_collect.click( fn=analytics_collect_data, inputs=[mcp_url, learn_user_id, learn_xsec_token], outputs=[learn_collect_status], ) btn_learn_calc.click( fn=analytics_calculate_weights, inputs=[], outputs=[learn_calc_status, learn_report], ) btn_learn_ai.click( fn=analytics_llm_deep_analysis, inputs=[llm_model], outputs=[learn_ai_report], ) btn_learn_start.click( fn=start_learn_scheduler, inputs=[mcp_url, learn_user_id, learn_xsec_token, llm_model, learn_interval], outputs=[learn_sched_status], ) btn_learn_stop.click( fn=stop_learn_scheduler, inputs=[], outputs=[learn_sched_status], ) btn_show_topics.click( fn=analytics_get_weighted_topics, inputs=[], outputs=[learn_weighted_topics], ) learn_use_weights.change( fn=lambda v: cfg.set("use_smart_weights", v) or ("✅ 智能权重已启用" if v else "⚪ 智能权重已关闭"), inputs=[learn_use_weights], outputs=[learn_sched_status], ) # ---- Tab 7: 自动运营 ---- # 人设切换 → 联动更新评论关键词池和主题池 persona.change( fn=on_persona_changed, inputs=[persona], outputs=[auto_comment_keywords, auto_publish_topics, persona_pool_hint], ) btn_auto_comment.click( fn=_auto_comment_with_log, inputs=[auto_comment_keywords, mcp_url, llm_model, persona], outputs=[auto_comment_result, auto_log_display], ) btn_auto_like.click( fn=_auto_like_with_log, inputs=[auto_comment_keywords, auto_like_count, mcp_url], outputs=[auto_like_result, auto_log_display], ) btn_auto_favorite.click( fn=_auto_favorite_with_log, inputs=[auto_comment_keywords, auto_fav_count, mcp_url], outputs=[auto_favorite_result, auto_log_display], ) btn_auto_reply.click( fn=_auto_reply_with_log, inputs=[auto_reply_max, mcp_url, llm_model, persona], outputs=[auto_reply_result, auto_log_display], ) btn_auto_publish.click( fn=_auto_publish_with_log, inputs=[auto_publish_topics, mcp_url, sd_url, sd_model, llm_model, persona, quality_mode, face_swap_toggle], outputs=[auto_publish_result, auto_log_display], ) btn_start_sched.click( fn=start_scheduler, inputs=[sched_comment_on, sched_publish_on, sched_reply_on, sched_like_on, sched_fav_on, sched_c_min, sched_c_max, sched_p_min, sched_p_max, sched_r_min, sched_r_max, sched_reply_max, sched_l_min, sched_l_max, sched_like_count, sched_fav_min, sched_fav_max, sched_fav_count, sched_start_hour, sched_end_hour, auto_comment_keywords, auto_publish_topics, mcp_url, sd_url, sd_model, llm_model, persona, quality_mode, face_swap_toggle], outputs=[sched_result], ) btn_stop_sched.click( fn=stop_scheduler, inputs=[], outputs=[sched_result], ) btn_refresh_log.click( fn=lambda: (get_auto_log(), get_scheduler_status()), inputs=[], outputs=[auto_log_display, sched_status], ) btn_clear_log.click( fn=lambda: (_auto_log.clear() or "📋 日志已清空"), inputs=[], outputs=[auto_log_display], ) btn_refresh_stats.click( fn=lambda: (get_scheduler_status(), _get_stats_summary()), inputs=[], outputs=[sched_status, auto_stats_display], ) # ---- 开机自启 ---- autostart_toggle.change( fn=toggle_autostart, inputs=[autostart_toggle], outputs=[autostart_status], ) # ---- 启动时自动刷新 SD ---- app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar]) # ================================================== if __name__ == "__main__": logger.info("🍒 小红书 AI 爆文工坊 V2.0 启动中...") app.launch( server_name=os.environ.get("GRADIO_SERVER_NAME", "127.0.0.1"), server_port=int(os.environ.get("GRADIO_SERVER_PORT", "7860")), inbrowser=True, share=False, )