xhs_factory/analytics_service.py
zhoujie 1ea8bfb554 feat(analytics): 增强 MCP 数据解析兼容性
- 优化用户资料和笔记详情的数据提取逻辑,优先从 `raw["raw"]["content"]` 获取内容,并回退到 `raw["content"]`
- 在笔记详情解析中,增加从 `result["text"]` 提取文本的备用路径
- 在用户动态流解析中,优先从 `f["id"]` 获取笔记 ID,并增加无 ID 条目的日志警告

 feat(persona): 扩展人设池并集成视觉风格配置

- 新增“赛博AI虚拟博主”和“性感福利主播”人设及其对应的主题与关键词
- 在 `sd_service.py` 中新增 `PERSONA_SD_PROFILES` 字典,为每个人设定义视觉增强词、风格后缀和 LLM 绘图指导
- 新增 `get_persona_sd_profile` 函数,根据人设文本匹配对应的视觉配置

♻️ refactor(llm): 重构 SD 绘图提示词生成以支持人设

- 修改 `LLMService.get_sd_prompt_guide` 函数签名,新增 `persona` 参数
- 在生成的绘图指南中,根据匹配到的人设追加特定的视觉风格指导文本
- 针对“赛博AI虚拟博主”人设,调整反 AI 检测提示,允许使用高质量词汇和专业光效
- 更新所有调用 `get_sd_prompt_guide` 的地方(如文案生成函数),传入 `persona` 参数

♻️ refactor(sd): 重构文生图服务以支持人设视觉增强

- 修改 `SDService.txt2img` 函数签名,新增 `persona` 参数
- 在生成最终提示词时,注入人设特定的增强词(`prompt_boost`)和风格词(`prompt_style`)
- 在生成最终负面提示词时,追加人设特定的额外负面词(`negative_extra`)
- 增加人设视觉增强已注入的日志信息

🔧 chore(config): 更新默认人设配置

- 将 `config_manager.py` 中的默认 `persona` 从“身材管理健身美女”更新为“性感福利主播”

🔧 chore(main): 更新 UI 函数签名以传递人设参数

- 更新 `generate_images` 函数签名,新增 `persona_text` 参数,并在内部解析为人设对象
- 更新 `auto_publish_once` 和 `generate_to_queue` 函数中调用 `sd_svc.txt2img` 的地方,传入 `persona` 参数
- 更新 Gradio 界面中 `btn_gen_img` 的点击事件,将 `persona` 输入传递给 `generate_images` 函数
2026-02-10 22:29:55 +08:00

643 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
笔记数据分析 & 智能权重学习模块
定时抓取已发布笔记的互动数据,自动学习哪些内容受欢迎,生成加权主题池
"""
import json
import os
import re
import time
import logging
import math
from datetime import datetime, timedelta
from collections import defaultdict
logger = logging.getLogger(__name__)
ANALYTICS_FILE = "analytics_data.json"
WEIGHTS_FILE = "content_weights.json"
def _safe_int(val) -> int:
"""'1.2万' / '1234' / 1234 等格式转为整数"""
if isinstance(val, (int, float)):
return int(val)
if not val:
return 0
s = str(val).strip()
if "" in s:
try:
return int(float(s.replace("", "")) * 10000)
except ValueError:
return 0
try:
return int(float(s))
except ValueError:
return 0
class AnalyticsService:
"""笔记表现分析 & 权重学习引擎"""
def __init__(self, workspace_dir: str = "xhs_workspace"):
self.workspace_dir = workspace_dir
self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE)
self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE)
self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""})
self._weights = self._load_json(self.weights_path, {
"topic_weights": {},
"style_weights": {},
"tag_weights": {},
"title_pattern_weights": {},
"time_weights": {},
"last_updated": "",
"analysis_history": [],
})
# ========== 持久化 ==========
@staticmethod
def _load_json(path: str, default: dict) -> dict:
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning("加载 %s 失败: %s,使用默认值", path, e)
return default.copy()
def _save_analytics(self):
os.makedirs(self.workspace_dir, exist_ok=True)
with open(self.analytics_path, "w", encoding="utf-8") as f:
json.dump(self._analytics_data, f, ensure_ascii=False, indent=2)
def _save_weights(self):
os.makedirs(self.workspace_dir, exist_ok=True)
with open(self.weights_path, "w", encoding="utf-8") as f:
json.dump(self._weights, f, ensure_ascii=False, indent=2)
# ========== 数据采集 ==========
def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict:
"""
通过 MCP 获取我的所有笔记及其互动数据,存入 analytics_data.json
返回 {"total": N, "updated": M, "notes": [...]}
"""
logger.info("开始采集笔记表现数据 (user_id=%s)", user_id)
raw = mcp_client.get_user_profile(user_id, xsec_token)
text = ""
if isinstance(raw, dict):
# _call_tool 返回 {"success": True, "text": "...", "raw": <mcp原始响应>}
# 优先从 raw["raw"]["content"] 提取,兼容直接 content
inner_raw = raw.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = raw.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = raw.get("text", "")
# 解析 JSON
data = None
for attempt_fn in [
lambda t: json.loads(t),
lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)),
lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)),
]:
try:
data = attempt_fn(text)
if data:
break
except Exception:
continue
if not data:
return {"total": 0, "updated": 0, "error": "无法解析用户数据"}
feeds = data.get("feeds", [])
if not feeds:
return {"total": 0, "updated": 0, "error": "未找到笔记数据"}
notes_dict = self._analytics_data.get("notes", {})
updated = 0
note_summaries = []
for f in feeds:
nc = f.get("noteCard") or {}
# MCP 用户主页 feeds 中,笔记 ID 在 f["id"] 而非 nc["noteId"]
note_id = nc.get("noteId") or f.get("id", "") or f.get("noteId", "")
if not note_id:
logger.warning("跳过无 ID 的笔记条目: keys=%s", list(f.keys()))
continue
interact = nc.get("interactInfo") or {}
liked = _safe_int(interact.get("likedCount", 0))
# MCP 返回的用户主页笔记列表通常只有 likedCount
# 详情页才有评论数和收藏数,先用点赞数作为主指标
title = nc.get("displayTitle", "") or ""
note_type = nc.get("type", "normal") # normal / video
# 从本地备份的文案中提取主题、风格、标签
local_meta = self._find_local_meta(title)
note_data = {
"note_id": note_id,
"title": title,
"type": note_type,
"likes": liked,
"topic": local_meta.get("topic", ""),
"style": local_meta.get("style", ""),
"tags": local_meta.get("tags", []),
"sd_prompt": local_meta.get("sd_prompt", ""),
"collected_at": datetime.now().isoformat(),
}
# 更新或新增
old = notes_dict.get(note_id, {})
if old.get("likes", 0) != liked or not old:
updated += 1
notes_dict[note_id] = {**old, **note_data}
note_summaries.append(note_data)
self._analytics_data["notes"] = notes_dict
self._analytics_data["last_analysis"] = datetime.now().isoformat()
self._save_analytics()
logger.info("采集完成: 共 %d 篇笔记, 更新 %d", len(feeds), updated)
return {"total": len(feeds), "updated": updated, "notes": note_summaries}
def collect_note_details(self, mcp_client, note_id: str, xsec_token: str):
"""获取单篇笔记的详细数据(点赞、评论数、收藏等)"""
try:
result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False)
text = ""
if isinstance(result, dict):
# 兼容 _call_tool 包装格式
inner_raw = result.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = result.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = result.get("text", "")
if text:
data = None
try:
data = json.loads(text)
except Exception:
m = re.search(r'(\{[\s\S]*\})', text)
if m:
try:
data = json.loads(m.group(1))
except Exception:
pass
if data:
interact = data.get("interactInfo") or {}
comments = data.get("comments", [])
return {
"likes": _safe_int(interact.get("likedCount", 0)),
"comments_count": _safe_int(interact.get("commentCount", len(comments))),
"collects": _safe_int(interact.get("collectedCount", 0)),
"shares": _safe_int(interact.get("shareCount", 0)),
}
except Exception as e:
logger.warning("获取笔记 %s 详情失败: %s", note_id, e)
return None
def _find_local_meta(self, title: str) -> dict:
"""从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags"""
result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""}
if not title:
return result
# 搜索备份目录
try:
for dirname in os.listdir(self.workspace_dir):
dir_path = os.path.join(self.workspace_dir, dirname)
if not os.path.isdir(dir_path) or dirname.startswith("_"):
continue
txt_path = os.path.join(dir_path, "文案.txt")
if not os.path.exists(txt_path):
continue
try:
with open(txt_path, "r", encoding="utf-8") as f:
content = f.read()
# 检查标题是否匹配
if title[:10] in content or title in dirname:
# 提取元数据
for line in content.split("\n"):
if line.startswith("风格:"):
result["style"] = line.split(":", 1)[1].strip()
elif line.startswith("主题:"):
result["topic"] = line.split(":", 1)[1].strip()
elif line.startswith("标签:"):
tags_str = line.split(":", 1)[1].strip()
result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
elif line.startswith("SD Prompt:"):
result["sd_prompt"] = line.split(":", 1)[1].strip()
break
except Exception:
continue
except Exception:
pass
return result
# ========== 权重计算 ==========
def calculate_weights(self) -> dict:
"""
根据已采集的笔记表现数据,计算各维度权重
使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权
返回权重摘要
"""
notes = self._analytics_data.get("notes", {})
if not notes:
return {"error": "暂无笔记数据,请先采集"}
# 计算每篇笔记的综合得分
scored_notes = []
for nid, note in notes.items():
likes = note.get("likes", 0)
comments_count = note.get("comments_count", 0)
collects = note.get("collects", 0)
# 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5
score = likes * 1.0 + comments_count * 2.0 + collects * 1.5
# 至少用点赞数保底
if score == 0:
score = likes
scored_notes.append({**note, "score": score, "note_id": nid})
if not scored_notes:
return {"error": "没有可分析的笔记"}
# 按得分排序
scored_notes.sort(key=lambda x: x["score"], reverse=True)
max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1
# ---- 主题权重 ----
topic_scores = defaultdict(float)
topic_counts = defaultdict(int)
for note in scored_notes:
topic = note.get("topic", "").strip()
if topic:
topic_scores[topic] += note["score"]
topic_counts[topic] += 1
topic_weights = {}
for topic, total_score in topic_scores.items():
avg_score = total_score / topic_counts[topic]
# 归一化到 0-100
weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50
# 多篇验证的加分
if topic_counts[topic] >= 3:
weight = min(100, weight + 10)
elif topic_counts[topic] >= 2:
weight = min(100, weight + 5)
topic_weights[topic] = {
"weight": weight,
"count": topic_counts[topic],
"avg_score": round(avg_score, 1),
"total_score": round(total_score, 1),
}
# ---- 风格权重 ----
style_scores = defaultdict(float)
style_counts = defaultdict(int)
for note in scored_notes:
style = note.get("style", "").strip()
if style:
style_scores[style] += note["score"]
style_counts[style] += 1
style_weights = {}
for style, total_score in style_scores.items():
avg = total_score / style_counts[style]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
style_weights[style] = {
"weight": weight,
"count": style_counts[style],
"avg_score": round(avg, 1),
}
# ---- 标签权重 ----
tag_scores = defaultdict(float)
tag_counts = defaultdict(int)
for note in scored_notes:
for tag in note.get("tags", []):
tag = tag.strip().lstrip("#")
if tag:
tag_scores[tag] += note["score"]
tag_counts[tag] += 1
tag_weights = {}
for tag, total_score in tag_scores.items():
avg = total_score / tag_counts[tag]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]}
# 排序后取 Top
tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30])
# ---- 标题模式权重 (提取 emoji/句式/长度特征) ----
title_patterns = defaultdict(list)
for note in scored_notes:
title = note.get("title", "")
if not title:
continue
# 检测标题特征
has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title))
has_question = "" in title or "?" in title
has_exclaim = "" in title or "!" in title
has_ellipsis = "..." in title or "" in title
length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)")
for feature, val in [
("含emoji", has_emoji), ("疑问句式", has_question),
("感叹句式", has_exclaim), ("省略句式", has_ellipsis),
]:
if val:
title_patterns[feature].append(note["score"])
title_patterns[f"长度:{length_bucket}"].append(note["score"])
title_pattern_weights = {}
for pattern, scores in title_patterns.items():
avg = sum(scores) / len(scores) if scores else 0
title_pattern_weights[pattern] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
"avg_score": round(avg, 1),
}
# ---- 发布时间权重 ----
time_scores = defaultdict(list)
for note in scored_notes:
collected = note.get("collected_at", "")
if collected:
try:
dt = datetime.fromisoformat(collected)
hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}"
time_scores[hour_bucket].append(note["score"])
except Exception:
pass
time_weights = {}
for bucket, scores in time_scores.items():
avg = sum(scores) / len(scores) if scores else 0
time_weights[bucket] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
}
# ---- 保存权重 ----
self._weights.update({
"topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"tag_weights": tag_weights,
"title_pattern_weights": title_pattern_weights,
"time_weights": time_weights,
"last_updated": datetime.now().isoformat(),
"total_notes_analyzed": len(scored_notes),
"top_note": {
"title": scored_notes[0].get("title", ""),
"score": scored_notes[0].get("score", 0),
"likes": scored_notes[0].get("likes", 0),
} if scored_notes else {},
})
# 追加分析历史
history = self._weights.get("analysis_history", [])
history.append({
"time": datetime.now().isoformat(),
"total_notes": len(scored_notes),
"avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1),
"top_topic": list(topic_weights.keys())[0] if topic_weights else "",
})
# 只保留最近 50 条
self._weights["analysis_history"] = history[-50:]
self._save_weights()
return {
"total_notes": len(scored_notes),
"top_topics": list(topic_weights.items())[:10],
"top_styles": list(style_weights.items())[:5],
"top_tags": list(tag_weights.items())[:10],
"title_patterns": title_pattern_weights,
"top_note": scored_notes[0] if scored_notes else None,
}
# ========== 加权主题选择 ==========
def get_weighted_topic(self, base_topics: list[str] = None) -> str:
"""
根据权重从主题池中加权随机选择一个主题
如果没有权重数据, 退回均匀随机
"""
import random
topic_weights = self._weights.get("topic_weights", {})
if not topic_weights:
# 无权重数据,从基础池中随机
return random.choice(base_topics) if base_topics else "日常分享"
# 合并: 已有权重的主题 + base_topics 中新的主题
all_topics = {}
for topic, info in topic_weights.items():
all_topics[topic] = info.get("weight", 50)
if base_topics:
for t in base_topics:
if t not in all_topics:
all_topics[t] = 30 # 新主题给一个基础权重
# 加权随机选择
topics = list(all_topics.keys())
weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1
chosen = random.choices(topics, weights=weights, k=1)[0]
logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?"))
return chosen
def get_weighted_style(self, base_styles: list[str] = None) -> str:
"""根据权重选择风格"""
import random
style_weights = self._weights.get("style_weights", {})
if not style_weights:
return random.choice(base_styles) if base_styles else "真实分享"
all_styles = {}
for style, info in style_weights.items():
all_styles[style] = info.get("weight", 50)
if base_styles:
for s in base_styles:
if s not in all_styles:
all_styles[s] = 30
styles = list(all_styles.keys())
weights = [max(1, all_styles[s]) for s in styles]
return random.choices(styles, weights=weights, k=1)[0]
def get_top_tags(self, n: int = 8) -> list[str]:
"""获取权重最高的 N 个标签"""
tag_weights = self._weights.get("tag_weights", {})
if not tag_weights:
return []
sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return [t[0] for t in sorted_tags[:n]]
def get_title_advice(self) -> str:
"""根据标题模式权重生成建议"""
patterns = self._weights.get("title_pattern_weights", {})
if not patterns:
return "暂无标题分析数据"
sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
advice_parts = []
for p_name, p_info in sorted_p[:5]:
advice_parts.append(f"{p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)")
return "\n".join(advice_parts)
# ========== LLM 深度分析 ==========
def generate_llm_analysis_prompt(self) -> str:
"""生成给 LLM 分析笔记表现的 prompt 数据部分"""
notes = self._analytics_data.get("notes", {})
if not notes:
return ""
# 按点赞排序
sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True)
lines = []
for i, note in enumerate(sorted_notes[:20]):
lines.append(
f"#{i+1}{note.get('title', '无标题')}\n"
f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | "
f"风格: {note.get('style', '未知')}\n"
f" 标签: {', '.join(note.get('tags', []))}"
)
return "\n".join(lines)
# ========== 报告生成 ==========
def generate_report(self) -> str:
"""生成 Markdown 格式的分析报告"""
weights = self._weights
notes = self._analytics_data.get("notes", {})
if not notes:
return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。"
total = len(notes)
last_updated = weights.get("last_updated", "未知")
# Top Note
top_note = weights.get("top_note", {})
top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无"
lines = [
f"## 📊 智能内容学习报告",
f"",
f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}",
f"📝 分析笔记数: **{total}** 篇",
f"🏆 最佳笔记: {top_note_str}",
"",
"---",
"",
]
# 主题权重
topic_w = weights.get("topic_weights", {})
if topic_w:
lines.append("### 🎯 主题权重排行")
lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |")
lines.append("|:---:|------|:---:|:---:|:---:|")
for idx, (topic, info) in enumerate(list(topic_w.items())[:10]):
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(
f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |"
)
lines.append("")
# 风格权重
style_w = weights.get("style_weights", {})
if style_w:
lines.append("### 🎨 风格权重排行")
for style, info in list(style_w.items())[:5]:
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)")
lines.append("")
# 标签权重
tag_w = weights.get("tag_weights", {})
if tag_w:
lines.append("### 🏷️ 高权重标签 (Top 10)")
top_tags = list(tag_w.items())[:10]
tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags]
lines.append(" | ".join(tag_strs))
lines.append("")
# 标题模式
title_p = weights.get("title_pattern_weights", {})
if title_p:
lines.append("### ✏️ 标题模式分析")
sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
for p_name, p_info in sorted_p[:6]:
lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)")
lines.append("")
# 建议
lines.append("---")
lines.append("### 💡 智能建议")
if topic_w:
top_3 = list(topic_w.keys())[:3]
lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}")
if tag_w:
hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]]
lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}")
if title_p:
best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0))
lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})")
lines.append("")
lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容")
return "\n".join(lines)
def get_weighted_topics_display(self) -> str:
"""获取加权后的主题列表(用于UI显示)"""
topic_w = self._weights.get("topic_weights", {})
if not topic_w:
return ""
# 按权重排序,返回逗号分隔
sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return ", ".join([t[0] for t in sorted_topics[:15]])
@property
def has_weights(self) -> bool:
"""是否已有权重数据"""
return bool(self._weights.get("topic_weights"))
@property
def weights_summary(self) -> str:
"""一行权重摘要"""
tw = self._weights.get("topic_weights", {})
total = self._weights.get("total_notes_analyzed", 0)
if not tw:
return "暂无权重数据"
top = list(tw.keys())[:3]
return f"{total}篇笔记 | 热门: {', '.join(top)}"