""" 笔记数据分析 & 智能权重学习模块 定时抓取已发布笔记的互动数据,自动学习哪些内容受欢迎,生成加权主题池 """ import json import os import re import time import logging import math from datetime import datetime, timedelta from collections import defaultdict logger = logging.getLogger(__name__) ANALYTICS_FILE = "analytics_data.json" WEIGHTS_FILE = "content_weights.json" def _safe_int(val) -> int: """将 '1.2万' / '1234' / 1234 等格式转为整数""" if isinstance(val, (int, float)): return int(val) if not val: return 0 s = str(val).strip() if "万" in s: try: return int(float(s.replace("万", "")) * 10000) except ValueError: return 0 try: return int(float(s)) except ValueError: return 0 class AnalyticsService: """笔记表现分析 & 权重学习引擎""" def __init__(self, workspace_dir: str = "xhs_workspace"): self.workspace_dir = workspace_dir self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE) self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE) self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""}) self._weights = self._load_json(self.weights_path, { "topic_weights": {}, "style_weights": {}, "tag_weights": {}, "title_pattern_weights": {}, "time_weights": {}, "last_updated": "", "analysis_history": [], }) # ========== 持久化 ========== @staticmethod def _load_json(path: str, default: dict) -> dict: if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: logger.warning("加载 %s 失败: %s,使用默认值", path, e) return default.copy() def _save_analytics(self): os.makedirs(self.workspace_dir, exist_ok=True) with open(self.analytics_path, "w", encoding="utf-8") as f: json.dump(self._analytics_data, f, ensure_ascii=False, indent=2) def _save_weights(self): os.makedirs(self.workspace_dir, exist_ok=True) with open(self.weights_path, "w", encoding="utf-8") as f: json.dump(self._weights, f, ensure_ascii=False, indent=2) # ========== 数据采集 ========== def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict: """ 通过 MCP 获取我的所有笔记及其互动数据,存入 analytics_data.json 返回 {"total": N, "updated": M, "notes": [...]} """ logger.info("开始采集笔记表现数据 (user_id=%s)", user_id) raw = mcp_client.get_user_profile(user_id, xsec_token) text = "" if isinstance(raw, dict): # _call_tool 返回 {"success": True, "text": "...", "raw": } # 优先从 raw["raw"]["content"] 提取,兼容直接 content inner_raw = raw.get("raw", {}) content_list = [] if isinstance(inner_raw, dict): content_list = inner_raw.get("content", []) if not content_list: content_list = raw.get("content", []) for item in content_list: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text", "") break if not text: text = raw.get("text", "") # 解析 JSON data = None for attempt_fn in [ lambda t: json.loads(t), lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)), lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)), ]: try: data = attempt_fn(text) if data: break except Exception: continue if not data: return {"total": 0, "updated": 0, "error": "无法解析用户数据"} feeds = data.get("feeds", []) if not feeds: return {"total": 0, "updated": 0, "error": "未找到笔记数据"} notes_dict = self._analytics_data.get("notes", {}) updated = 0 note_summaries = [] for f in feeds: nc = f.get("noteCard") or {} # MCP 用户主页 feeds 中,笔记 ID 在 f["id"] 而非 nc["noteId"] note_id = nc.get("noteId") or f.get("id", "") or f.get("noteId", "") if not note_id: logger.warning("跳过无 ID 的笔记条目: keys=%s", list(f.keys())) continue interact = nc.get("interactInfo") or {} liked = _safe_int(interact.get("likedCount", 0)) # MCP 返回的用户主页笔记列表通常只有 likedCount # 详情页才有评论数和收藏数,先用点赞数作为主指标 title = nc.get("displayTitle", "") or "" note_type = nc.get("type", "normal") # normal / video # 从本地备份的文案中提取主题、风格、标签 local_meta = self._find_local_meta(title) note_data = { "note_id": note_id, "title": title, "type": note_type, "likes": liked, "topic": local_meta.get("topic", ""), "style": local_meta.get("style", ""), "tags": local_meta.get("tags", []), "sd_prompt": local_meta.get("sd_prompt", ""), "collected_at": datetime.now().isoformat(), } # 更新或新增 old = notes_dict.get(note_id, {}) if old.get("likes", 0) != liked or not old: updated += 1 notes_dict[note_id] = {**old, **note_data} note_summaries.append(note_data) self._analytics_data["notes"] = notes_dict self._analytics_data["last_analysis"] = datetime.now().isoformat() self._save_analytics() logger.info("采集完成: 共 %d 篇笔记, 更新 %d 篇", len(feeds), updated) return {"total": len(feeds), "updated": updated, "notes": note_summaries} def collect_note_details(self, mcp_client, note_id: str, xsec_token: str): """获取单篇笔记的详细数据(点赞、评论数、收藏等)""" try: result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False) text = "" if isinstance(result, dict): # 兼容 _call_tool 包装格式 inner_raw = result.get("raw", {}) content_list = [] if isinstance(inner_raw, dict): content_list = inner_raw.get("content", []) if not content_list: content_list = result.get("content", []) for item in content_list: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text", "") break if not text: text = result.get("text", "") if text: data = None try: data = json.loads(text) except Exception: m = re.search(r'(\{[\s\S]*\})', text) if m: try: data = json.loads(m.group(1)) except Exception: pass if data: interact = data.get("interactInfo") or {} comments = data.get("comments", []) return { "likes": _safe_int(interact.get("likedCount", 0)), "comments_count": _safe_int(interact.get("commentCount", len(comments))), "collects": _safe_int(interact.get("collectedCount", 0)), "shares": _safe_int(interact.get("shareCount", 0)), } except Exception as e: logger.warning("获取笔记 %s 详情失败: %s", note_id, e) return None def _find_local_meta(self, title: str) -> dict: """从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags""" result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""} if not title: return result # 搜索备份目录 try: for dirname in os.listdir(self.workspace_dir): dir_path = os.path.join(self.workspace_dir, dirname) if not os.path.isdir(dir_path) or dirname.startswith("_"): continue txt_path = os.path.join(dir_path, "文案.txt") if not os.path.exists(txt_path): continue try: with open(txt_path, "r", encoding="utf-8") as f: content = f.read() # 检查标题是否匹配 if title[:10] in content or title in dirname: # 提取元数据 for line in content.split("\n"): if line.startswith("风格:"): result["style"] = line.split(":", 1)[1].strip() elif line.startswith("主题:"): result["topic"] = line.split(":", 1)[1].strip() elif line.startswith("标签:"): tags_str = line.split(":", 1)[1].strip() result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()] elif line.startswith("SD Prompt:"): result["sd_prompt"] = line.split(":", 1)[1].strip() break except Exception: continue except Exception: pass return result # ========== 权重计算 ========== def calculate_weights(self) -> dict: """ 根据已采集的笔记表现数据,计算各维度权重 使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权 返回权重摘要 """ notes = self._analytics_data.get("notes", {}) if not notes: return {"error": "暂无笔记数据,请先采集"} # 计算每篇笔记的综合得分 scored_notes = [] for nid, note in notes.items(): likes = note.get("likes", 0) comments_count = note.get("comments_count", 0) collects = note.get("collects", 0) # 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5 score = likes * 1.0 + comments_count * 2.0 + collects * 1.5 # 至少用点赞数保底 if score == 0: score = likes scored_notes.append({**note, "score": score, "note_id": nid}) if not scored_notes: return {"error": "没有可分析的笔记"} # 按得分排序 scored_notes.sort(key=lambda x: x["score"], reverse=True) max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1 # ---- 主题权重 ---- topic_scores = defaultdict(float) topic_counts = defaultdict(int) for note in scored_notes: topic = note.get("topic", "").strip() if topic: topic_scores[topic] += note["score"] topic_counts[topic] += 1 topic_weights = {} for topic, total_score in topic_scores.items(): avg_score = total_score / topic_counts[topic] # 归一化到 0-100 weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50 # 多篇验证的加分 if topic_counts[topic] >= 3: weight = min(100, weight + 10) elif topic_counts[topic] >= 2: weight = min(100, weight + 5) topic_weights[topic] = { "weight": weight, "count": topic_counts[topic], "avg_score": round(avg_score, 1), "total_score": round(total_score, 1), } # ---- 风格权重 ---- style_scores = defaultdict(float) style_counts = defaultdict(int) for note in scored_notes: style = note.get("style", "").strip() if style: style_scores[style] += note["score"] style_counts[style] += 1 style_weights = {} for style, total_score in style_scores.items(): avg = total_score / style_counts[style] weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50 style_weights[style] = { "weight": weight, "count": style_counts[style], "avg_score": round(avg, 1), } # ---- 标签权重 ---- tag_scores = defaultdict(float) tag_counts = defaultdict(int) for note in scored_notes: for tag in note.get("tags", []): tag = tag.strip().lstrip("#") if tag: tag_scores[tag] += note["score"] tag_counts[tag] += 1 tag_weights = {} for tag, total_score in tag_scores.items(): avg = total_score / tag_counts[tag] weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50 tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]} # 排序后取 Top tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30]) # ---- 标题模式权重 (提取 emoji/句式/长度特征) ---- title_patterns = defaultdict(list) for note in scored_notes: title = note.get("title", "") if not title: continue # 检测标题特征 has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title)) has_question = "?" in title or "?" in title has_exclaim = "!" in title or "!" in title has_ellipsis = "..." in title or "…" in title length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)") for feature, val in [ ("含emoji", has_emoji), ("疑问句式", has_question), ("感叹句式", has_exclaim), ("省略句式", has_ellipsis), ]: if val: title_patterns[feature].append(note["score"]) title_patterns[f"长度:{length_bucket}"].append(note["score"]) title_pattern_weights = {} for pattern, scores in title_patterns.items(): avg = sum(scores) / len(scores) if scores else 0 title_pattern_weights[pattern] = { "weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50, "count": len(scores), "avg_score": round(avg, 1), } # ---- 发布时间权重 ---- time_scores = defaultdict(list) for note in scored_notes: collected = note.get("collected_at", "") if collected: try: dt = datetime.fromisoformat(collected) hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}时" time_scores[hour_bucket].append(note["score"]) except Exception: pass time_weights = {} for bucket, scores in time_scores.items(): avg = sum(scores) / len(scores) if scores else 0 time_weights[bucket] = { "weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50, "count": len(scores), } # ---- 保存权重 ---- self._weights.update({ "topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)), "style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)), "tag_weights": tag_weights, "title_pattern_weights": title_pattern_weights, "time_weights": time_weights, "last_updated": datetime.now().isoformat(), "total_notes_analyzed": len(scored_notes), "top_note": { "title": scored_notes[0].get("title", ""), "score": scored_notes[0].get("score", 0), "likes": scored_notes[0].get("likes", 0), } if scored_notes else {}, }) # 追加分析历史 history = self._weights.get("analysis_history", []) history.append({ "time": datetime.now().isoformat(), "total_notes": len(scored_notes), "avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1), "top_topic": list(topic_weights.keys())[0] if topic_weights else "", }) # 只保留最近 50 条 self._weights["analysis_history"] = history[-50:] self._save_weights() return { "total_notes": len(scored_notes), "top_topics": list(topic_weights.items())[:10], "top_styles": list(style_weights.items())[:5], "top_tags": list(tag_weights.items())[:10], "title_patterns": title_pattern_weights, "top_note": scored_notes[0] if scored_notes else None, } # ========== 加权主题选择 ========== def get_weighted_topic(self, base_topics: list[str] = None) -> str: """ 根据权重从主题池中加权随机选择一个主题 如果没有权重数据, 退回均匀随机 """ import random topic_weights = self._weights.get("topic_weights", {}) if not topic_weights: # 无权重数据,从基础池中随机 return random.choice(base_topics) if base_topics else "日常分享" # 合并: 已有权重的主题 + base_topics 中新的主题 all_topics = {} for topic, info in topic_weights.items(): all_topics[topic] = info.get("weight", 50) if base_topics: for t in base_topics: if t not in all_topics: all_topics[t] = 30 # 新主题给一个基础权重 # 加权随机选择 topics = list(all_topics.keys()) weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1 chosen = random.choices(topics, weights=weights, k=1)[0] logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?")) return chosen def get_weighted_style(self, base_styles: list[str] = None) -> str: """根据权重选择风格""" import random style_weights = self._weights.get("style_weights", {}) if not style_weights: return random.choice(base_styles) if base_styles else "真实分享" all_styles = {} for style, info in style_weights.items(): all_styles[style] = info.get("weight", 50) if base_styles: for s in base_styles: if s not in all_styles: all_styles[s] = 30 styles = list(all_styles.keys()) weights = [max(1, all_styles[s]) for s in styles] return random.choices(styles, weights=weights, k=1)[0] def get_top_tags(self, n: int = 8) -> list[str]: """获取权重最高的 N 个标签""" tag_weights = self._weights.get("tag_weights", {}) if not tag_weights: return [] sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True) return [t[0] for t in sorted_tags[:n]] def get_title_advice(self) -> str: """根据标题模式权重生成建议""" patterns = self._weights.get("title_pattern_weights", {}) if not patterns: return "暂无标题分析数据" sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True) advice_parts = [] for p_name, p_info in sorted_p[:5]: advice_parts.append(f" • {p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)") return "\n".join(advice_parts) # ========== LLM 深度分析 ========== def generate_llm_analysis_prompt(self) -> str: """生成给 LLM 分析笔记表现的 prompt 数据部分""" notes = self._analytics_data.get("notes", {}) if not notes: return "" # 按点赞排序 sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True) lines = [] for i, note in enumerate(sorted_notes[:20]): lines.append( f"#{i+1} 「{note.get('title', '无标题')}」\n" f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | " f"风格: {note.get('style', '未知')}\n" f" 标签: {', '.join(note.get('tags', []))}" ) return "\n".join(lines) # ========== 报告生成 ========== def generate_report(self) -> str: """生成 Markdown 格式的分析报告""" weights = self._weights notes = self._analytics_data.get("notes", {}) if not notes: return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。" total = len(notes) last_updated = weights.get("last_updated", "未知") # Top Note top_note = weights.get("top_note", {}) top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无" lines = [ f"## 📊 智能内容学习报告", f"", f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}", f"📝 分析笔记数: **{total}** 篇", f"🏆 最佳笔记: {top_note_str}", "", "---", "", ] # 主题权重 topic_w = weights.get("topic_weights", {}) if topic_w: lines.append("### 🎯 主题权重排行") lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |") lines.append("|:---:|------|:---:|:---:|:---:|") for idx, (topic, info) in enumerate(list(topic_w.items())[:10]): bar = "█" * (info["weight"] // 10) + "░" * (10 - info["weight"] // 10) lines.append( f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |" ) lines.append("") # 风格权重 style_w = weights.get("style_weights", {}) if style_w: lines.append("### 🎨 风格权重排行") for style, info in list(style_w.items())[:5]: bar = "█" * (info["weight"] // 10) + "░" * (10 - info["weight"] // 10) lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)") lines.append("") # 标签权重 tag_w = weights.get("tag_weights", {}) if tag_w: lines.append("### 🏷️ 高权重标签 (Top 10)") top_tags = list(tag_w.items())[:10] tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags] lines.append(" | ".join(tag_strs)) lines.append("") # 标题模式 title_p = weights.get("title_pattern_weights", {}) if title_p: lines.append("### ✏️ 标题模式分析") sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True) for p_name, p_info in sorted_p[:6]: lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)") lines.append("") # 建议 lines.append("---") lines.append("### 💡 智能建议") if topic_w: top_3 = list(topic_w.keys())[:3] lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}") if tag_w: hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]] lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}") if title_p: best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0)) lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})") lines.append("") lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容") return "\n".join(lines) def get_weighted_topics_display(self) -> str: """获取加权后的主题列表(用于UI显示)""" topic_w = self._weights.get("topic_weights", {}) if not topic_w: return "" # 按权重排序,返回逗号分隔 sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True) return ", ".join([t[0] for t in sorted_topics[:15]]) @property def has_weights(self) -> bool: """是否已有权重数据""" return bool(self._weights.get("topic_weights")) @property def weights_summary(self) -> str: """一行权重摘要""" tw = self._weights.get("topic_weights", {}) total = self._weights.get("total_notes_analyzed", 0) if not tw: return "暂无权重数据" top = list(tw.keys())[:3] return f"{total}篇笔记 | 热门: {', '.join(top)}"