""" 小红书 AI 爆文生产工坊 V2.0 全自动工作台:灵感 -> 文案 -> 绘图 -> 发布 -> 运营 """ import gradio as gr import os import re import json import time import logging import platform import subprocess from PIL import Image import matplotlib import matplotlib.pyplot as plt from config_manager import ConfigManager, OUTPUT_DIR from llm_service import LLMService from sd_service import SDService, DEFAULT_NEGATIVE from mcp_client import MCPClient, get_mcp_client # ================= matplotlib 中文字体配置 ================= _font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"] for _fn in _font_candidates: try: matplotlib.font_manager.findfont(_fn, fallback_to_default=False) plt.rcParams["font.sans-serif"] = [_fn] break except Exception: continue plt.rcParams["axes.unicode_minus"] = False # ================= 日志配置 ================= logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("autobot.log", encoding="utf-8"), ], ) logger = logging.getLogger("autobot") # 强制不走代理连接本地服务 os.environ["NO_PROXY"] = "127.0.0.1,localhost" # ================= 全局服务初始化 ================= cfg = ConfigManager() cfg.ensure_workspace() mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp")) # ================================================== # LLM 多提供商管理 # ================================================== def _get_llm_config() -> tuple[str, str, str]: """获取当前激活 LLM 的 (api_key, base_url, model)""" p = cfg.get_active_llm() if p: return p["api_key"], p["base_url"], cfg.get("model", "") return "", "", "" def connect_llm(provider_name): """连接选中的 LLM 提供商并获取模型列表""" if not provider_name: return gr.update(choices=[], value=None), "⚠️ 请先选择或添加 LLM 提供商" cfg.set_active_llm(provider_name) p = cfg.get_active_llm() if not p: return gr.update(choices=[], value=None), "❌ 未找到该提供商配置" try: svc = LLMService(p["api_key"], p["base_url"]) models = svc.get_models() if models: return ( gr.update(choices=models, value=models[0]), f"✅ 已连接「{provider_name}」,加载 {len(models)} 个模型", ) else: # API 无法获取模型列表,保留手动输入 current_model = cfg.get("model", "") return ( gr.update(choices=[current_model] if current_model else [], value=current_model or None), f"⚠️ 已连接「{provider_name}」,但未获取到模型列表,请手动输入模型名", ) except Exception as e: logger.error("LLM 连接失败: %s", e) current_model = cfg.get("model", "") return ( gr.update(choices=[current_model] if current_model else [], value=current_model or None), f"❌ 连接「{provider_name}」失败: {e}", ) def add_llm_provider(name, api_key, base_url): """添加新的 LLM 提供商""" msg = cfg.add_llm_provider(name, api_key, base_url) names = cfg.get_llm_provider_names() active = cfg.get("active_llm", "") return ( gr.update(choices=names, value=active), msg, ) def remove_llm_provider(provider_name): """删除 LLM 提供商""" if not provider_name: return gr.update(choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", "")), "⚠️ 请先选择要删除的提供商" msg = cfg.remove_llm_provider(provider_name) names = cfg.get_llm_provider_names() active = cfg.get("active_llm", "") return ( gr.update(choices=names, value=active), msg, ) def on_provider_selected(provider_name): """切换 LLM 提供商时更新显示信息""" if not provider_name: return "未选择提供商" for p in cfg.get_llm_providers(): if p["name"] == provider_name: cfg.set_active_llm(provider_name) masked_key = p["api_key"][:8] + "***" if len(p["api_key"]) > 8 else "***" return f"**{provider_name}** \nAPI Key: `{masked_key}` \nBase URL: `{p['base_url']}`" return "未找到该提供商" # ================================================== # Tab 1: 内容创作 # ================================================== def connect_sd(sd_url): """连接 SD 并获取模型列表""" try: svc = SDService(sd_url) ok, msg = svc.check_connection() if ok: models = svc.get_models() cfg.set("sd_url", sd_url) return gr.update(choices=models, value=models[0] if models else None), f"✅ {msg}" return gr.update(choices=[]), f"❌ {msg}" except Exception as e: logger.error("SD 连接失败: %s", e) return gr.update(choices=[]), f"❌ SD 连接失败: {e}" def check_mcp_status(mcp_url): """检查 MCP 连接状态""" try: client = get_mcp_client(mcp_url) ok, msg = client.check_connection() if ok: cfg.set("mcp_url", mcp_url) return f"✅ MCP 服务正常 - {msg}" return f"❌ {msg}" except Exception as e: return f"❌ MCP 连接失败: {e}" # ================================================== # 小红书账号登录 # ================================================== def get_login_qrcode(mcp_url): """获取小红书登录二维码""" try: client = get_mcp_client(mcp_url) result = client.get_login_qrcode() if "error" in result: return None, f"❌ 获取二维码失败: {result['error']}" qr_image = result.get("qr_image") msg = result.get("text", "") if qr_image: return qr_image, f"✅ 二维码已生成,请用小红书 App 扫码\n{msg}" return None, f"⚠️ 未获取到二维码图片,MCP 返回:\n{msg}" except Exception as e: logger.error("获取登录二维码失败: %s", e) return None, f"❌ 获取二维码失败: {e}" def logout_xhs(mcp_url): """退出登录:清除 cookies 并重置本地 token""" try: client = get_mcp_client(mcp_url) result = client.delete_cookies() if "error" in result: return f"❌ 退出失败: {result['error']}" cfg.set("xsec_token", "") client._reset() return "✅ 已退出登录,可以重新扫码登录" except Exception as e: logger.error("退出登录失败: %s", e) return f"❌ 退出失败: {e}" def _auto_fetch_xsec_token(mcp_url) -> str: """从推荐列表自动获取一个有效的 xsec_token""" try: client = get_mcp_client(mcp_url) entries = client.list_feeds_parsed() for e in entries: token = e.get("xsec_token", "") if token: return token except Exception as e: logger.warning("自动获取 xsec_token 失败: %s", e) return "" def check_login(mcp_url): """检查登录状态,登录成功后自动获取 xsec_token 并保存""" try: client = get_mcp_client(mcp_url) result = client.check_login_status() if "error" in result: return f"❌ {result['error']}", gr.update(), gr.update() text = result.get("text", "") if "未登录" in text: return f"🔴 {text}", gr.update(), gr.update() # 登录成功 → 自动获取 xsec_token token = _auto_fetch_xsec_token(mcp_url) if token: cfg.set("xsec_token", token) logger.info("自动获取 xsec_token 成功") return ( f"🟢 {text}\n\n✅ xsec_token 已自动获取并保存", gr.update(value=cfg.get("my_user_id", "")), gr.update(value=token), ) return f"🟢 {text}\n\n⚠️ 自动获取 xsec_token 失败,请手动刷新", gr.update(), gr.update() except Exception as e: return f"❌ 检查登录状态失败: {e}", gr.update(), gr.update() def save_my_user_id(user_id_input): """保存用户 ID (验证 24 位十六进制格式)""" uid = (user_id_input or "").strip() if not uid: cfg.set("my_user_id", "") return "⚠️ 已清除用户 ID" if not re.match(r'^[0-9a-fA-F]{24}$', uid): return ( "❌ 格式错误!用户 ID 应为 24 位十六进制字符串\n" f"你输入的: `{uid}` ({len(uid)} 位)\n\n" "💡 如果你输入的是小红书号 (纯数字如 18688457507),那不是 userId。" ) cfg.set("my_user_id", uid) return f"✅ 用户 ID 已保存: `{uid}`" def generate_copy(model, topic, style): """生成文案""" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" try: svc = LLMService(api_key, base_url, model) data = svc.generate_copy(topic, style) cfg.set("model", model) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags) if tags else "", "✅ 文案生成完毕", ) except Exception as e: logger.error("文案生成失败: %s", e) return "", "", "", "", f"❌ 生成失败: {e}" def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale): """生成图片""" if not model: return None, [], "❌ 未选择 SD 模型" try: svc = SDService(sd_url) images = svc.txt2img( prompt=prompt, negative_prompt=neg_prompt, model=model, steps=int(steps), cfg_scale=float(cfg_scale), ) return images, images, f"✅ 生成 {len(images)} 张图片" except Exception as e: logger.error("图片生成失败: %s", e) return None, [], f"❌ 绘图失败: {e}" def one_click_export(title, content, images): """导出文案和图片到本地""" if not title: return "❌ 无法导出:没有标题" safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] folder_name = f"{int(time.time())}_{safe_title}" folder_path = os.path.join(OUTPUT_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"{title}\n\n{content}") saved_paths = [] if images: for idx, img in enumerate(images): path = os.path.join(folder_path, f"图{idx+1}.png") if isinstance(img, Image.Image): img.save(path) saved_paths.append(os.path.abspath(path)) # 尝试打开文件夹 try: abs_path = os.path.abspath(folder_path) if platform.system() == "Windows": os.startfile(abs_path) elif platform.system() == "Darwin": subprocess.call(["open", abs_path]) else: subprocess.call(["xdg-open", abs_path]) except Exception: pass return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)" def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time): """通过 MCP 发布到小红书""" if not title: return "❌ 缺少标题" client = get_mcp_client(mcp_url) # 收集图片路径 image_paths = [] # 先保存 AI 生成的图片到临时目录 if images: temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish") os.makedirs(temp_dir, exist_ok=True) for idx, img in enumerate(images): if isinstance(img, Image.Image): path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.png")) img.save(path) image_paths.append(path) # 添加本地上传的图片 if local_images: for img_file in local_images: # Gradio File 组件返回的是 NamedString 或 tempfile path img_path = img_file.name if hasattr(img_file, 'name') else str(img_file) if os.path.exists(img_path): image_paths.append(os.path.abspath(img_path)) if not image_paths: return "❌ 至少需要 1 张图片才能发布" # 解析标签 tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None # 定时发布 schedule = schedule_time if schedule_time and schedule_time.strip() else None try: result = client.publish_content( title=title, content=content, images=image_paths, tags=tags, schedule_at=schedule, ) if "error" in result: return f"❌ 发布失败: {result['error']}" return f"✅ 发布成功!\n{result.get('text', '')}" except Exception as e: logger.error("发布失败: %s", e) return f"❌ 发布异常: {e}" # ================================================== # Tab 2: 热点探测 # ================================================== def search_hotspots(keyword, sort_by, mcp_url): """搜索小红书热门内容""" if not keyword: return "❌ 请输入搜索关键词", "" try: client = get_mcp_client(mcp_url) result = client.search_feeds(keyword, sort_by=sort_by) if "error" in result: return f"❌ 搜索失败: {result['error']}", "" text = result.get("text", "无结果") return "✅ 搜索完成", text except Exception as e: logger.error("热点搜索失败: %s", e) return f"❌ 搜索失败: {e}", "" def analyze_and_suggest(model, keyword, search_result): """AI 分析热点并给出建议""" if not search_result: return "❌ 请先搜索", "", "" api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ 请先配置 LLM 提供商", "", "" try: svc = LLMService(api_key, base_url, model) analysis = svc.analyze_hotspots(search_result) topics = "\n".join(f"• {t}" for t in analysis.get("hot_topics", [])) patterns = "\n".join(f"• {p}" for p in analysis.get("title_patterns", [])) suggestions = "\n".join( f"**{s['topic']}** - {s['reason']}" for s in analysis.get("suggestions", []) ) structure = analysis.get("content_structure", "") summary = ( f"## 🔥 热门选题\n{topics}\n\n" f"## 📝 标题套路\n{patterns}\n\n" f"## 📐 内容结构\n{structure}\n\n" f"## 💡 推荐选题\n{suggestions}" ) return "✅ 分析完成", summary, keyword except Exception as e: logger.error("热点分析失败: %s", e) return f"❌ 分析失败: {e}", "", "" def generate_from_hotspot(model, topic_from_hotspot, style, search_result): """基于热点分析生成文案""" if not topic_from_hotspot: return "", "", "", "", "❌ 请先选择或输入选题" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置 LLM 提供商" try: svc = LLMService(api_key, base_url, model) data = svc.generate_copy_with_reference( topic=topic_from_hotspot, style=style, reference_notes=search_result[:2000], # 截断防止超长 ) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags), "✅ 基于热点的文案已生成", ) except Exception as e: return "", "", "", "", f"❌ 生成失败: {e}" # ================================================== # Tab 3: 评论管家 # ================================================== # ---- 共用: 笔记列表缓存 ---- # 主动评论缓存 _cached_proactive_entries: list[dict] = [] # 我的笔记评论缓存 _cached_my_note_entries: list[dict] = [] def _fetch_and_cache(keyword, mcp_url, cache_name="proactive"): """通用: 获取笔记列表并缓存""" global _cached_proactive_entries, _cached_my_note_entries try: client = get_mcp_client(mcp_url) if keyword and keyword.strip(): entries = client.search_feeds_parsed(keyword.strip()) src = f"搜索「{keyword.strip()}」" else: entries = client.list_feeds_parsed() src = "首页推荐" if cache_name == "proactive": _cached_proactive_entries = entries else: _cached_my_note_entries = entries if not entries: return gr.update(choices=[], value=None), f"⚠️ 从{src}未找到笔记" choices = [] for i, e in enumerate(entries): title_short = (e["title"] or "无标题")[:28] label = f"[{i+1}] {title_short} | @{e['author'] or '未知'} | ❤ {e['likes']}" choices.append(label) return ( gr.update(choices=choices, value=choices[0]), f"✅ 从{src}获取 {len(entries)} 条笔记", ) except Exception as e: if cache_name == "proactive": _cached_proactive_entries = [] else: _cached_my_note_entries = [] return gr.update(choices=[], value=None), f"❌ {e}" def _pick_from_cache(selected, cache_name="proactive"): """通用: 从缓存中提取选中条目的 feed_id / xsec_token / title""" cache = _cached_proactive_entries if cache_name == "proactive" else _cached_my_note_entries if not selected or not cache: return "", "", "" try: # 尝试从 [N] 前缀提取序号 idx = int(selected.split("]")[0].replace("[", "")) - 1 if 0 <= idx < len(cache): e = cache[idx] return e["feed_id"], e["xsec_token"], e.get("title", "") except (ValueError, IndexError): pass # 回退: 模糊匹配标题 for e in cache: if e.get("title", "")[:15] in selected: return e["feed_id"], e["xsec_token"], e.get("title", "") return "", "", "" # ---- 模块 A: 主动评论他人 ---- def fetch_proactive_notes(keyword, mcp_url): return _fetch_and_cache(keyword, mcp_url, "proactive") def on_proactive_note_selected(selected): return _pick_from_cache(selected, "proactive") def load_note_for_comment(feed_id, xsec_token, mcp_url): """加载目标笔记详情 (标题+正文+已有评论), 用于 AI 分析""" if not feed_id or not xsec_token: return "❌ 请先选择笔记", "", "", "" try: client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "", "", "" full_text = result.get("text", "") # 尝试分离正文和评论 if "评论" in full_text: parts = full_text.split("评论", 1) content_part = parts[0].strip() comments_part = "评论" + parts[1] if len(parts) > 1 else "" else: content_part = full_text[:500] comments_part = "" return "✅ 笔记内容已加载", content_part[:800], comments_part[:1500], full_text except Exception as e: return f"❌ {e}", "", "", "" def ai_generate_comment(model, persona, post_title, post_content, existing_comments): """AI 生成主动评论""" api_key, base_url, _ = _get_llm_config() if not api_key: return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM", "❌ 未选模型" if not post_title and not post_content: return "⚠️ 请先加载笔记内容", "❌ 无笔记内容" try: svc = LLMService(api_key, base_url, model) comment = svc.generate_proactive_comment( persona, post_title, post_content[:600], existing_comments[:800] ) return comment, "✅ 评论已生成" except Exception as e: logger.error(f"AI 评论生成失败: {e}") return f"生成失败: {e}", f"❌ {e}" def send_comment(feed_id, xsec_token, comment_content, mcp_url): """发送评论到别人的笔记""" if not all([feed_id, xsec_token, comment_content]): return "❌ 缺少必要参数 (笔记ID / token / 评论内容)" try: client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, comment_content) if "error" in result: return f"❌ {result['error']}" return "✅ 评论已发送!" except Exception as e: return f"❌ {e}" # ---- 模块 B: 回复我的笔记评论 ---- def fetch_my_notes(mcp_url): """通过已保存的 userId 获取我的笔记列表""" global _cached_my_note_entries my_uid = cfg.get("my_user_id", "") xsec = cfg.get("xsec_token", "") if not my_uid: return ( gr.update(choices=[], value=None), "❌ 未配置用户 ID,请先到「账号登录」页填写并保存", ) if not xsec: return ( gr.update(choices=[], value=None), "❌ 未获取 xsec_token,请先登录", ) try: client = get_mcp_client(mcp_url) result = client.get_user_profile(my_uid, xsec) if "error" in result: return gr.update(choices=[], value=None), f"❌ {result['error']}" # 从 raw 中解析 feeds raw = result.get("raw", {}) text = result.get("text", "") data = None if raw and isinstance(raw, dict): for item in raw.get("content", []): if item.get("type") == "text": try: data = json.loads(item["text"]) except (json.JSONDecodeError, KeyError): pass if not data: try: data = json.loads(text) except (json.JSONDecodeError, TypeError): pass feeds = (data or {}).get("feeds") or [] if not feeds: return ( gr.update(choices=[], value=None), "⚠️ 未找到你的笔记,可能账号还没有发布内容", ) entries = [] for f in feeds: nc = f.get("noteCard") or {} user = nc.get("user") or {} interact = nc.get("interactInfo") or {} entries.append({ "feed_id": f.get("id", ""), "xsec_token": f.get("xsecToken", ""), "title": nc.get("displayTitle", "未知标题"), "author": user.get("nickname", user.get("nickName", "")), "user_id": user.get("userId", ""), "likes": interact.get("likedCount", "0"), "type": nc.get("type", ""), }) _cached_my_note_entries = entries choices = [ f"[{i+1}] {e['title'][:20]} | {e['type']} | ❤{e['likes']}" for i, e in enumerate(entries) ] return ( gr.update(choices=choices, value=choices[0] if choices else None), f"✅ 找到 {len(entries)} 篇笔记", ) except Exception as e: return gr.update(choices=[], value=None), f"❌ {e}" def on_my_note_selected(selected): return _pick_from_cache(selected, "my_notes") def fetch_my_note_comments(feed_id, xsec_token, mcp_url): """获取我的笔记的评论列表""" if not feed_id or not xsec_token: return "❌ 请先选择笔记", "" try: client = get_mcp_client(mcp_url) result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: return f"❌ {result['error']}", "" return "✅ 评论加载完成", result.get("text", "暂无评论") except Exception as e: return f"❌ {e}", "" def ai_reply_comment(model, persona, post_title, comment_text): """AI 生成评论回复""" api_key, base_url, _ = _get_llm_config() if not api_key: return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置" if not model: return "⚠️ 请先连接 LLM 并选择模型", "❌ 未选择模型" if not comment_text: return "请输入需要回复的评论内容", "⚠️ 请输入评论" try: svc = LLMService(api_key, base_url, model) reply = svc.generate_reply(persona, post_title, comment_text) return reply, "✅ 回复已生成" except Exception as e: logger.error(f"AI 回复生成失败: {e}") return f"生成失败: {e}", f"❌ {e}" def send_reply(feed_id, xsec_token, reply_content, mcp_url): """发送评论回复""" if not all([feed_id, xsec_token, reply_content]): return "❌ 缺少必要参数" try: client = get_mcp_client(mcp_url) result = client.post_comment(feed_id, xsec_token, reply_content) if "error" in result: return f"❌ 回复失败: {result['error']}" return "✅ 回复已发送" except Exception as e: return f"❌ 发送失败: {e}" # ================================================== # Tab 4: 数据看板 (我的账号) # ================================================== def _parse_profile_json(text: str): """尝试从文本中解析用户 profile JSON""" if not text: return None # 直接 JSON try: return json.loads(text) except (json.JSONDecodeError, TypeError): pass # 可能包含 Markdown 代码块 m = re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', text) if m: try: return json.loads(m.group(1)) except (json.JSONDecodeError, TypeError): pass return None def _parse_count(val) -> float: """解析数字字符串, 支持 '1.2万' 格式""" if isinstance(val, (int, float)): return float(val) s = str(val).strip() if "万" in s: try: return float(s.replace("万", "")) * 10000 except ValueError: pass try: return float(s) except ValueError: return 0.0 def fetch_my_profile(user_id, xsec_token, mcp_url): """获取我的账号数据, 返回结构化信息 + 可视化图表""" if not user_id or not xsec_token: return "❌ 请填写你的用户 ID 和 xsec_token", "", None, None, None try: client = get_mcp_client(mcp_url) result = client.get_user_profile(user_id, xsec_token) if "error" in result: return f"❌ {result['error']}", "", None, None, None raw = result.get("raw", {}) text = result.get("text", "") # 尝试从 raw 或 text 解析 JSON data = None if raw and isinstance(raw, dict): content_list = raw.get("content", []) for item in content_list: if item.get("type") == "text": data = _parse_profile_json(item.get("text", "")) if data: break if not data: data = _parse_profile_json(text) if not data: return "✅ 数据加载完成 (纯文本)", text, None, None, None # ---- 提取基本信息 (注意 MCP 对新号可能返回 null) ---- basic = data.get("userBasicInfo") or {} interactions = data.get("interactions") or [] feeds = data.get("feeds") or [] gender_map = {0: "未知", 1: "男", 2: "女"} info_lines = [ f"## 👤 {basic.get('nickname', '未知')}", f"- **小红书号**: {basic.get('redId', '-')}", f"- **性别**: {gender_map.get(basic.get('gender', 0), '未知')}", f"- **IP 属地**: {basic.get('ipLocation', '-')}", f"- **简介**: {basic.get('desc', '-')}", "", "### 📊 核心数据", ] for inter in interactions: info_lines.append(f"- **{inter.get('name', '')}**: {inter.get('count', '0')}") info_lines.append(f"\n### 📝 展示笔记: {len(feeds)} 篇") profile_md = "\n".join(info_lines) # ---- 互动数据柱状图 ---- fig_interact = None if interactions: inter_data = {i["name"]: _parse_count(i["count"]) for i in interactions} fig_interact, ax = plt.subplots(figsize=(4, 3), dpi=100) labels = list(inter_data.keys()) values = list(inter_data.values()) colors = ["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(labels)] ax.bar(labels, values, color=colors, edgecolor="white", linewidth=0.5) ax.set_title("账号核心指标", fontsize=12, fontweight="bold") for i, v in enumerate(values): display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) ax.text(i, v + max(values) * 0.02, display, ha="center", fontsize=9) ax.set_ylabel("") ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) fig_interact.tight_layout() # ---- 笔记点赞分布图 ---- fig_notes = None if feeds: titles, likes = [], [] for f in feeds[:15]: nc = f.get("noteCard") or {} t = (nc.get("displayTitle", "") or "无标题")[:12] lk = _parse_count((nc.get("interactInfo") or {}).get("likedCount", "0")) titles.append(t) likes.append(lk) fig_notes, ax2 = plt.subplots(figsize=(7, 3.5), dpi=100) ax2.barh(range(len(titles)), likes, color="#FF6B6B", edgecolor="white") ax2.set_yticks(range(len(titles))) ax2.set_yticklabels(titles, fontsize=8) ax2.set_title(f"笔记点赞排行 (Top {len(titles)})", fontsize=12, fontweight="bold") ax2.invert_yaxis() for i, v in enumerate(likes): display = f"{v/10000:.1f}万" if v >= 10000 else str(int(v)) ax2.text(v + max(likes) * 0.01 if max(likes) > 0 else 0, i, display, va="center", fontsize=8) ax2.spines["top"].set_visible(False) ax2.spines["right"].set_visible(False) fig_notes.tight_layout() # ---- 笔记详情表格 (Markdown) ---- table_lines = [ "### 📋 笔记数据明细", "| # | 标题 | 类型 | ❤ 点赞 |", "|---|------|------|--------|", ] for i, f in enumerate(feeds): nc = f.get("noteCard") or {} t = (nc.get("displayTitle", "") or "无标题")[:25] tp = "📹 视频" if nc.get("type") == "video" else "📷 图文" lk = (nc.get("interactInfo") or {}).get("likedCount", "0") table_lines.append(f"| {i+1} | {t} | {tp} | {lk} |") notes_table = "\n".join(table_lines) return "✅ 数据加载完成", profile_md, fig_interact, fig_notes, notes_table except Exception as e: logger.error(f"获取我的数据失败: {e}") return f"❌ {e}", "", None, None, None # ================================================== # UI 构建 # ================================================== config = cfg.all with gr.Blocks( title="小红书 AI 爆文工坊 V2.0", theme=gr.themes.Soft(), css=""" .status-ok { color: #16a34a; font-weight: bold; } .status-err { color: #dc2626; font-weight: bold; } footer { display: none !important; } """, ) as app: gr.Markdown( "# 🍒 小红书 AI 爆文生产工坊 V2.0\n" "> 灵感 → 文案 → 绘图 → 发布 → 运营,一站式全闭环" ) # 全局状态 state_images = gr.State([]) state_search_result = gr.State("") # ============ 全局设置栏 ============ with gr.Accordion("⚙️ 全局设置 (自动保存)", open=False): gr.Markdown("#### 🤖 LLM 提供商 (支持所有 OpenAI 兼容接口)") with gr.Row(): llm_provider = gr.Dropdown( label="选择 LLM 提供商", choices=cfg.get_llm_provider_names(), value=cfg.get("active_llm", ""), interactive=True, scale=2, ) btn_connect_llm = gr.Button("🔗 连接 LLM", size="sm", scale=1) with gr.Row(): llm_model = gr.Dropdown( label="LLM 模型", value=config["model"], allow_custom_value=True, interactive=True, scale=2, ) llm_provider_info = gr.Markdown( value="*选择提供商后显示详情*", ) with gr.Accordion("➕ 添加 / 管理 LLM 提供商", open=False): with gr.Row(): new_provider_name = gr.Textbox( label="名称", placeholder="如: DeepSeek / GPT-4o / 通义千问", scale=1, ) new_provider_key = gr.Textbox( label="API Key", type="password", scale=2, ) new_provider_url = gr.Textbox( label="Base URL", placeholder="https://api.openai.com/v1", value="https://api.openai.com/v1", scale=2, ) with gr.Row(): btn_add_provider = gr.Button("✅ 添加提供商", variant="primary", size="sm") btn_del_provider = gr.Button("🗑️ 删除当前提供商", variant="stop", size="sm") provider_mgmt_status = gr.Markdown("") gr.Markdown("---") with gr.Row(): mcp_url = gr.Textbox( label="MCP Server URL", value=config["mcp_url"], scale=2, ) sd_url = gr.Textbox( label="SD WebUI URL", value=config["sd_url"], scale=2, ) persona = gr.Textbox( label="博主人设(评论回复用)", value=config["persona"], scale=3, ) with gr.Row(): btn_connect_sd = gr.Button("🎨 连接 SD", size="sm") btn_check_mcp = gr.Button("📡 检查 MCP", size="sm") with gr.Row(): sd_model = gr.Dropdown( label="SD 模型", allow_custom_value=True, interactive=True, scale=2, ) status_bar = gr.Markdown("🔄 等待连接...") # ============ Tab 页面 ============ with gr.Tabs(): # -------- Tab 1: 内容创作 -------- with gr.Tab("✨ 内容创作"): with gr.Row(): # 左栏:输入 with gr.Column(scale=1): gr.Markdown("### 💡 构思") topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭") style = gr.Dropdown( ["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷", "知识科普"], label="风格", value="好物种草", ) btn_gen_copy = gr.Button("✨ 第一步:生成文案", variant="primary") gr.Markdown("---") gr.Markdown("### 🎨 绘图参数") with gr.Accordion("高级设置", open=False): neg_prompt = gr.Textbox( label="反向提示词", value=DEFAULT_NEGATIVE, lines=2, ) steps = gr.Slider(15, 50, value=25, step=1, label="步数") cfg_scale = gr.Slider(1, 15, value=7, step=0.5, label="CFG Scale") btn_gen_img = gr.Button("🎨 第二步:生成图片", variant="primary") # 中栏:文案编辑 with gr.Column(scale=1): gr.Markdown("### 📝 文案编辑") res_title = gr.Textbox(label="标题 (≤20字)", interactive=True) res_content = gr.TextArea( label="正文 (可手动修改)", lines=12, interactive=True, ) res_prompt = gr.TextArea( label="绘图提示词", lines=3, interactive=True, ) res_tags = gr.Textbox( label="话题标签 (逗号分隔)", interactive=True, placeholder="穿搭, 春季, 好物种草", ) # 右栏:预览 & 发布 with gr.Column(scale=1): gr.Markdown("### 🖼️ 视觉预览") gallery = gr.Gallery(label="AI 生成图片", columns=2, height=300) local_images = gr.File( label="📁 上传本地图片(可混排)", file_count="multiple", file_types=["image"], ) gr.Markdown("### 🚀 发布") schedule_time = gr.Textbox( label="定时发布 (可选, ISO8601格式)", placeholder="如 2026-02-08T18:00:00+08:00,留空=立即发布", ) with gr.Row(): btn_export = gr.Button("📂 导出本地", variant="secondary") btn_publish = gr.Button("🚀 发布到小红书", variant="primary") publish_msg = gr.Markdown("") # -------- Tab 2: 热点探测 -------- with gr.Tab("🔥 热点探测"): gr.Markdown("### 搜索热门内容 → AI 分析趋势 → 一键借鉴创作") with gr.Row(): with gr.Column(scale=1): hot_keyword = gr.Textbox( label="搜索关键词", placeholder="如:春季穿搭", ) hot_sort = gr.Dropdown( ["综合", "最新", "最多点赞", "最多评论", "最多收藏"], label="排序", value="综合", ) btn_search = gr.Button("🔍 搜索", variant="primary") search_status = gr.Markdown("") with gr.Column(scale=2): search_output = gr.TextArea( label="搜索结果", lines=12, interactive=False, ) with gr.Row(): btn_analyze = gr.Button("🧠 AI 分析热点趋势", variant="primary") analysis_status = gr.Markdown("") analysis_output = gr.Markdown(label="分析报告") topic_from_hot = gr.Textbox( label="选择/输入创作选题", placeholder="基于分析选一个方向", ) with gr.Row(): hot_style = gr.Dropdown( ["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"], label="风格", value="好物种草", ) btn_gen_from_hot = gr.Button("✨ 基于热点生成文案", variant="primary") with gr.Row(): hot_title = gr.Textbox(label="生成的标题", interactive=True) hot_content = gr.TextArea(label="生成的正文", lines=8, interactive=True) with gr.Row(): hot_prompt = gr.TextArea(label="绘图提示词", lines=3, interactive=True) hot_tags = gr.Textbox(label="标签", interactive=True) hot_gen_status = gr.Markdown("") btn_sync_to_create = gr.Button( "📋 同步到「内容创作」Tab → 绘图 & 发布", variant="primary", ) # -------- Tab 3: 评论管家 -------- with gr.Tab("💬 评论管家"): gr.Markdown("### 智能评论管理:主动评论引流 & 自动回复粉丝") with gr.Tabs(): # ======== 子 Tab A: 主动评论他人 ======== with gr.Tab("✍️ 主动评论引流"): gr.Markdown( "> **流程**:搜索/浏览笔记 → 选择目标 → 加载内容 → " "AI 分析笔记+已有评论自动生成高质量评论 → 一键发送" ) # 笔记选择器 with gr.Row(): pro_keyword = gr.Textbox( label="🔍 搜索关键词 (留空则获取推荐)", placeholder="穿搭、美食、旅行…", ) btn_pro_fetch = gr.Button("🔍 获取笔记", variant="primary") with gr.Row(): pro_selector = gr.Dropdown( label="📋 选择目标笔记", choices=[], interactive=True, ) pro_fetch_status = gr.Markdown("") # 隐藏字段 with gr.Row(): pro_feed_id = gr.Textbox(label="笔记 ID", interactive=False) pro_xsec_token = gr.Textbox(label="xsec_token", interactive=False) pro_title = gr.Textbox(label="标题", interactive=False) # 加载内容 & AI 分析 btn_pro_load = gr.Button("📖 加载笔记内容", variant="secondary") pro_load_status = gr.Markdown("") with gr.Row(): with gr.Column(scale=1): pro_content = gr.TextArea( label="📄 笔记正文摘要", lines=8, interactive=False, ) with gr.Column(scale=1): pro_comments = gr.TextArea( label="💬 已有评论", lines=8, interactive=False, ) # 隐藏: 完整文本 pro_full_text = gr.Textbox(visible=False) gr.Markdown("---") with gr.Row(): with gr.Column(scale=1): btn_pro_ai = gr.Button( "🤖 AI 智能生成评论", variant="primary", size="lg", ) pro_ai_status = gr.Markdown("") with gr.Column(scale=2): pro_comment_text = gr.TextArea( label="✏️ 评论内容 (可手动修改)", lines=3, interactive=True, placeholder="点击左侧按钮自动生成,也可手动编写", ) with gr.Row(): btn_pro_send = gr.Button("📩 发送评论", variant="primary") pro_send_status = gr.Markdown("") # ======== 子 Tab B: 回复我的评论 ======== with gr.Tab("💌 回复粉丝评论"): gr.Markdown( "> **流程**:选择我的笔记 → 加载评论 → " "粘贴要回复的评论 → AI 生成回复 → 一键发送" ) # 笔记选择器 (自动用已保存的 userId 获取) with gr.Row(): btn_my_fetch = gr.Button("🔍 获取我的笔记", variant="primary") with gr.Row(): my_selector = gr.Dropdown( label="📋 选择我的笔记", choices=[], interactive=True, ) my_fetch_status = gr.Markdown("") with gr.Row(): my_feed_id = gr.Textbox(label="笔记 ID", interactive=False) my_xsec_token = gr.Textbox(label="xsec_token", interactive=False) my_title = gr.Textbox(label="笔记标题", interactive=False) btn_my_load_comments = gr.Button("📥 加载评论", variant="primary") my_comment_status = gr.Markdown("") my_comments_display = gr.TextArea( label="📋 粉丝评论列表", lines=12, interactive=False, ) gr.Markdown("---") gr.Markdown("#### 📝 回复评论") with gr.Row(): with gr.Column(scale=1): my_target_comment = gr.TextArea( label="要回复的评论内容", lines=3, placeholder="从上方评论列表中复制粘贴要回复的评论", ) btn_my_ai_reply = gr.Button( "🤖 AI 生成回复", variant="secondary", ) my_reply_gen_status = gr.Markdown("") with gr.Column(scale=1): my_reply_content = gr.TextArea( label="回复内容 (可修改)", lines=3, interactive=True, ) btn_my_send_reply = gr.Button( "📩 发送回复", variant="primary", ) my_reply_status = gr.Markdown("") # -------- Tab 4: 账号登录 -------- with gr.Tab("🔐 账号登录"): gr.Markdown( "### 小红书账号登录\n" "> 扫码登录后自动获取 xsec_token,配合用户 ID 即可使用所有功能" ) with gr.Row(): with gr.Column(scale=1): gr.Markdown( "**操作步骤:**\n" "1. 确保 MCP 服务已启动\n" "2. 点击「获取登录二维码」→ 用小红书 App 扫码\n" "3. 点击「检查登录状态」→ 自动获取并保存 xsec_token\n" "4. 首次使用请填写你的用户 ID 并点击保存\n\n" "⚠️ 登录后不要在其他网页端登录同一账号,否则会被踢出" ) btn_get_qrcode = gr.Button( "📱 获取登录二维码", variant="primary", size="lg", ) btn_check_login = gr.Button( "🔍 检查登录状态 (自动获取 Token)", variant="secondary", size="lg", ) btn_logout = gr.Button( "🚪 退出登录 (重新扫码)", variant="stop", size="lg", ) login_status = gr.Markdown("🔄 等待操作...") gr.Markdown("---") gr.Markdown( "#### 📌 我的账号信息\n" "> **注意**: 小红书号 ≠ 用户 ID\n" "> - **小红书号 (redId)**: 如 `18688457507`,是你在 App 个人页看到的\n" "> - **用户 ID (userId)**: 如 `5a695db6e8ac2b72e8af2a53`,24位十六进制字符串\n\n" "💡 **如何获取 userId?**\n" "1. 用浏览器打开你的小红书主页\n" "2. 网址格式为: `xiaohongshu.com/user/profile/xxxxxxxx`\n" "3. `profile/` 后面的就是你的 userId" ) login_user_id = gr.Textbox( label="我的用户 ID (24位 userId, 非小红书号)", value=config.get("my_user_id", ""), placeholder="如: 5a695db6e8ac2b72e8af2a53", ) login_xsec_token = gr.Textbox( label="xsec_token (登录后自动获取)", value=config.get("xsec_token", ""), interactive=False, ) btn_save_uid = gr.Button( "💾 保存用户 ID", variant="secondary", ) save_uid_status = gr.Markdown("") with gr.Column(scale=1): qr_image = gr.Image( label="扫码登录", height=350, width=350, ) # -------- Tab 5: 数据看板 -------- with gr.Tab("📊 数据看板"): gr.Markdown( "### 我的账号数据看板\n" "> 用户 ID 和 xsec_token 从「账号登录」自动获取,直接点击加载即可" ) with gr.Row(): with gr.Column(scale=1): data_user_id = gr.Textbox( label="我的用户 ID (自动填充)", value=config.get("my_user_id", ""), interactive=True, ) data_xsec_token = gr.Textbox( label="xsec_token (自动填充)", value=config.get("xsec_token", ""), interactive=True, ) btn_refresh_token = gr.Button( "🔄 刷新 Token", variant="secondary", ) btn_load_my_data = gr.Button( "📊 加载我的数据", variant="primary", size="lg", ) data_status = gr.Markdown("") with gr.Column(scale=2): profile_card = gr.Markdown( value="*等待加载...*", label="账号概览", ) gr.Markdown("---") gr.Markdown("### 📈 数据可视化") with gr.Row(): with gr.Column(scale=1): chart_interact = gr.Plot(label="📊 核心指标") with gr.Column(scale=2): chart_notes = gr.Plot(label="❤ 笔记点赞排行") gr.Markdown("---") notes_detail = gr.Markdown( value="*加载数据后显示笔记明细表格*", label="笔记数据明细", ) # ================================================== # 事件绑定 # ================================================== # ---- 全局设置: LLM 提供商管理 ---- btn_connect_llm.click( fn=connect_llm, inputs=[llm_provider], outputs=[llm_model, status_bar], ) llm_provider.change( fn=on_provider_selected, inputs=[llm_provider], outputs=[llm_provider_info], ) btn_add_provider.click( fn=add_llm_provider, inputs=[new_provider_name, new_provider_key, new_provider_url], outputs=[llm_provider, provider_mgmt_status], ) btn_del_provider.click( fn=remove_llm_provider, inputs=[llm_provider], outputs=[llm_provider, provider_mgmt_status], ) btn_connect_sd.click( fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar], ) btn_check_mcp.click( fn=check_mcp_status, inputs=[mcp_url], outputs=[status_bar], ) # ---- Tab 1: 内容创作 ---- btn_gen_copy.click( fn=generate_copy, inputs=[llm_model, topic, style], outputs=[res_title, res_content, res_prompt, res_tags, status_bar], ) btn_gen_img.click( fn=generate_images, inputs=[sd_url, res_prompt, neg_prompt, sd_model, steps, cfg_scale], outputs=[gallery, state_images, status_bar], ) btn_export.click( fn=one_click_export, inputs=[res_title, res_content, state_images], outputs=[publish_msg], ) btn_publish.click( fn=publish_to_xhs, inputs=[res_title, res_content, res_tags, state_images, local_images, mcp_url, schedule_time], outputs=[publish_msg], ) # ---- Tab 2: 热点探测 ---- btn_search.click( fn=search_hotspots, inputs=[hot_keyword, hot_sort, mcp_url], outputs=[search_status, search_output], ) # 搜索结果同步到 state search_output.change( fn=lambda x: x, inputs=[search_output], outputs=[state_search_result], ) btn_analyze.click( fn=analyze_and_suggest, inputs=[llm_model, hot_keyword, search_output], outputs=[analysis_status, analysis_output, topic_from_hot], ) btn_gen_from_hot.click( fn=generate_from_hotspot, inputs=[llm_model, topic_from_hot, hot_style, search_output], outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status], ) # 同步热点文案到内容创作 Tab btn_sync_to_create.click( fn=lambda t, c, p, tg: (t, c, p, tg, "✅ 已同步到「内容创作」,可切换 Tab 继续绘图和发布"), inputs=[hot_title, hot_content, hot_prompt, hot_tags], outputs=[res_title, res_content, res_prompt, res_tags, status_bar], ) # ---- Tab 3: 评论管家 ---- # == 子 Tab A: 主动评论引流 == btn_pro_fetch.click( fn=fetch_proactive_notes, inputs=[pro_keyword, mcp_url], outputs=[pro_selector, pro_fetch_status], ) pro_selector.change( fn=on_proactive_note_selected, inputs=[pro_selector], outputs=[pro_feed_id, pro_xsec_token, pro_title], ) btn_pro_load.click( fn=load_note_for_comment, inputs=[pro_feed_id, pro_xsec_token, mcp_url], outputs=[pro_load_status, pro_content, pro_comments, pro_full_text], ) btn_pro_ai.click( fn=ai_generate_comment, inputs=[llm_model, persona, pro_title, pro_content, pro_comments], outputs=[pro_comment_text, pro_ai_status], ) btn_pro_send.click( fn=send_comment, inputs=[pro_feed_id, pro_xsec_token, pro_comment_text, mcp_url], outputs=[pro_send_status], ) # == 子 Tab B: 回复粉丝评论 == btn_my_fetch.click( fn=fetch_my_notes, inputs=[mcp_url], outputs=[my_selector, my_fetch_status], ) my_selector.change( fn=on_my_note_selected, inputs=[my_selector], outputs=[my_feed_id, my_xsec_token, my_title], ) btn_my_load_comments.click( fn=fetch_my_note_comments, inputs=[my_feed_id, my_xsec_token, mcp_url], outputs=[my_comment_status, my_comments_display], ) btn_my_ai_reply.click( fn=ai_reply_comment, inputs=[llm_model, persona, my_title, my_target_comment], outputs=[my_reply_content, my_reply_gen_status], ) btn_my_send_reply.click( fn=send_reply, inputs=[my_feed_id, my_xsec_token, my_reply_content, mcp_url], outputs=[my_reply_status], ) # ---- Tab 4: 账号登录 ---- btn_get_qrcode.click( fn=get_login_qrcode, inputs=[mcp_url], outputs=[qr_image, login_status], ) btn_check_login.click( fn=check_login, inputs=[mcp_url], outputs=[login_status, login_user_id, login_xsec_token], ) btn_logout.click( fn=logout_xhs, inputs=[mcp_url], outputs=[login_status], ) btn_save_uid.click( fn=save_my_user_id, inputs=[login_user_id], outputs=[save_uid_status], ) # ---- Tab 5: 数据看板 ---- def refresh_xsec_token(mcp_url): token = _auto_fetch_xsec_token(mcp_url) if token: cfg.set("xsec_token", token) return gr.update(value=token), "✅ Token 已刷新" return gr.update(value=cfg.get("xsec_token", "")), "❌ 刷新失败,请确认已登录" btn_refresh_token.click( fn=refresh_xsec_token, inputs=[mcp_url], outputs=[data_xsec_token, data_status], ) btn_load_my_data.click( fn=fetch_my_profile, inputs=[data_user_id, data_xsec_token, mcp_url], outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail], ) # ---- 启动时自动刷新 SD ---- app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar]) # ================================================== if __name__ == "__main__": logger.info("🍒 小红书 AI 爆文工坊 V2.0 启动中...") app.launch(inbrowser=True, share=False)