xhs_factory/analytics_service.py
zhoujie d88b4e9a3b ♻️ refactor(config): 实现配置安全存储与原子写
- 新增 `get_secure()` 和 `set_secure()` 方法,优先从环境变量或系统 keyring 读取敏感配置,`config.json` 中仅存储占位符
- 将 `save()` 方法改为使用临时文件 + `os.replace()` 的原子写入,防止进程中断导致配置文件损坏
- 在 `add_llm_provider()` 和 `get_active_llm()` 中集成安全配置读写,自动迁移旧版明文 API Key

♻️ refactor(analytics): 实现分析数据原子写

- 将 `_save_analytics()` 和 `_save_weights()` 方法改为使用临时文件 + `os.replace()` 的原子写入
- 确保在写入过程中进程被终止时,原始数据文件保持完整

♻️ refactor(main): 增强发布功能健壮性与代码模块化

- 在 `publish_to_xhs()` 中增加发布前输入校验【标题长度、图片数量、文件存在性】并在 `finally` 块中自动清理本次生成的临时图片文件
- 为全局笔记列表缓存 `_cached_proactive_entries` 和 `_cached_my_note_entries` 引入 `threading.RLock` 保护,新增 `_set_cache()` 和 `_get_cache()` 线程安全操作函数
- 将「内容创作」Tab 的 UI 构建代码拆分至 `ui/tab_create.py` 模块,主文件通过 `build_tab()` 函数调用并组装
- 将 Gradio 应用的 CSS 和主题配置提取为模块级变量,提升可维护性

📦 build(deps): 新增 keyring 依赖

- 在 `requirements.txt` 中添加 `keyring>=24.0.0` 以支持系统凭证管理

📝 docs(openspec): 新增生产就绪审计文档

- 在 `openspec/changes/archive/2026-02-24-production-readiness-audit/` 下新增设计文档、提案、任务清单及各功能规格说明
- 将核心功能规格同步至 `openspec/specs/` 目录
2026-02-24 21:53:36 +08:00

666 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
笔记数据分析 & 智能权重学习模块
定时抓取已发布笔记的互动数据,自动学习哪些内容受欢迎,生成加权主题池
"""
import json
import os
import re
import tempfile
import time
import logging
import math
from datetime import datetime, timedelta
from collections import defaultdict
logger = logging.getLogger(__name__)
ANALYTICS_FILE = "analytics_data.json"
WEIGHTS_FILE = "content_weights.json"
def _safe_int(val) -> int:
"""'1.2万' / '1234' / 1234 等格式转为整数"""
if isinstance(val, (int, float)):
return int(val)
if not val:
return 0
s = str(val).strip()
if "" in s:
try:
return int(float(s.replace("", "")) * 10000)
except ValueError:
return 0
try:
return int(float(s))
except ValueError:
return 0
class AnalyticsService:
"""笔记表现分析 & 权重学习引擎"""
def __init__(self, workspace_dir: str = "xhs_workspace"):
self.workspace_dir = workspace_dir
self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE)
self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE)
self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""})
self._weights = self._load_json(self.weights_path, {
"topic_weights": {},
"style_weights": {},
"tag_weights": {},
"title_pattern_weights": {},
"time_weights": {},
"last_updated": "",
"analysis_history": [],
})
# ========== 持久化 ==========
@staticmethod
def _load_json(path: str, default: dict) -> dict:
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning("加载 %s 失败: %s,使用默认值", path, e)
return default.copy()
def _save_analytics(self):
os.makedirs(self.workspace_dir, exist_ok=True)
target = self.analytics_path
target_dir = os.path.dirname(os.path.abspath(target))
fd, tmp = tempfile.mkstemp(dir=target_dir, suffix=".tmp", prefix="analytics_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._analytics_data, f, ensure_ascii=False, indent=2)
os.replace(tmp, target)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
def _save_weights(self):
os.makedirs(self.workspace_dir, exist_ok=True)
target = self.weights_path
target_dir = os.path.dirname(os.path.abspath(target))
fd, tmp = tempfile.mkstemp(dir=target_dir, suffix=".tmp", prefix="weights_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._weights, f, ensure_ascii=False, indent=2)
os.replace(tmp, target)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
# ========== 数据采集 ==========
def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict:
"""
通过 MCP 获取我的所有笔记及其互动数据,存入 analytics_data.json
返回 {"total": N, "updated": M, "notes": [...]}
"""
logger.info("开始采集笔记表现数据 (user_id=%s)", user_id)
raw = mcp_client.get_user_profile(user_id, xsec_token)
text = ""
if isinstance(raw, dict):
# _call_tool 返回 {"success": True, "text": "...", "raw": <mcp原始响应>}
# 优先从 raw["raw"]["content"] 提取,兼容直接 content
inner_raw = raw.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = raw.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = raw.get("text", "")
# 解析 JSON
data = None
for attempt_fn in [
lambda t: json.loads(t),
lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)),
lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)),
]:
try:
data = attempt_fn(text)
if data:
break
except Exception:
continue
if not data:
return {"total": 0, "updated": 0, "error": "无法解析用户数据"}
feeds = data.get("feeds", [])
if not feeds:
return {"total": 0, "updated": 0, "error": "未找到笔记数据"}
notes_dict = self._analytics_data.get("notes", {})
updated = 0
note_summaries = []
for f in feeds:
nc = f.get("noteCard") or {}
# MCP 用户主页 feeds 中,笔记 ID 在 f["id"] 而非 nc["noteId"]
note_id = nc.get("noteId") or f.get("id", "") or f.get("noteId", "")
if not note_id:
logger.warning("跳过无 ID 的笔记条目: keys=%s", list(f.keys()))
continue
interact = nc.get("interactInfo") or {}
liked = _safe_int(interact.get("likedCount", 0))
# MCP 返回的用户主页笔记列表通常只有 likedCount
# 详情页才有评论数和收藏数,先用点赞数作为主指标
title = nc.get("displayTitle", "") or ""
note_type = nc.get("type", "normal") # normal / video
# 从本地备份的文案中提取主题、风格、标签
local_meta = self._find_local_meta(title)
note_data = {
"note_id": note_id,
"title": title,
"type": note_type,
"likes": liked,
"topic": local_meta.get("topic", ""),
"style": local_meta.get("style", ""),
"tags": local_meta.get("tags", []),
"sd_prompt": local_meta.get("sd_prompt", ""),
"collected_at": datetime.now().isoformat(),
}
# 更新或新增
old = notes_dict.get(note_id, {})
if old.get("likes", 0) != liked or not old:
updated += 1
notes_dict[note_id] = {**old, **note_data}
note_summaries.append(note_data)
self._analytics_data["notes"] = notes_dict
self._analytics_data["last_analysis"] = datetime.now().isoformat()
self._save_analytics()
logger.info("采集完成: 共 %d 篇笔记, 更新 %d", len(feeds), updated)
return {"total": len(feeds), "updated": updated, "notes": note_summaries}
def collect_note_details(self, mcp_client, note_id: str, xsec_token: str):
"""获取单篇笔记的详细数据(点赞、评论数、收藏等)"""
try:
result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False)
text = ""
if isinstance(result, dict):
# 兼容 _call_tool 包装格式
inner_raw = result.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = result.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = result.get("text", "")
if text:
data = None
try:
data = json.loads(text)
except Exception:
m = re.search(r'(\{[\s\S]*\})', text)
if m:
try:
data = json.loads(m.group(1))
except Exception:
pass
if data:
interact = data.get("interactInfo") or {}
comments = data.get("comments", [])
return {
"likes": _safe_int(interact.get("likedCount", 0)),
"comments_count": _safe_int(interact.get("commentCount", len(comments))),
"collects": _safe_int(interact.get("collectedCount", 0)),
"shares": _safe_int(interact.get("shareCount", 0)),
}
except Exception as e:
logger.warning("获取笔记 %s 详情失败: %s", note_id, e)
return None
def _find_local_meta(self, title: str) -> dict:
"""从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags"""
result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""}
if not title:
return result
# 搜索备份目录
try:
for dirname in os.listdir(self.workspace_dir):
dir_path = os.path.join(self.workspace_dir, dirname)
if not os.path.isdir(dir_path) or dirname.startswith("_"):
continue
txt_path = os.path.join(dir_path, "文案.txt")
if not os.path.exists(txt_path):
continue
try:
with open(txt_path, "r", encoding="utf-8") as f:
content = f.read()
# 检查标题是否匹配
if title[:10] in content or title in dirname:
# 提取元数据
for line in content.split("\n"):
if line.startswith("风格:"):
result["style"] = line.split(":", 1)[1].strip()
elif line.startswith("主题:"):
result["topic"] = line.split(":", 1)[1].strip()
elif line.startswith("标签:"):
tags_str = line.split(":", 1)[1].strip()
result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
elif line.startswith("SD Prompt:"):
result["sd_prompt"] = line.split(":", 1)[1].strip()
break
except Exception:
continue
except Exception:
pass
return result
# ========== 权重计算 ==========
def calculate_weights(self) -> dict:
"""
根据已采集的笔记表现数据,计算各维度权重
使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权
返回权重摘要
"""
notes = self._analytics_data.get("notes", {})
if not notes:
return {"error": "暂无笔记数据,请先采集"}
# 计算每篇笔记的综合得分
scored_notes = []
for nid, note in notes.items():
likes = note.get("likes", 0)
comments_count = note.get("comments_count", 0)
collects = note.get("collects", 0)
# 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5
score = likes * 1.0 + comments_count * 2.0 + collects * 1.5
# 至少用点赞数保底
if score == 0:
score = likes
scored_notes.append({**note, "score": score, "note_id": nid})
if not scored_notes:
return {"error": "没有可分析的笔记"}
# 按得分排序
scored_notes.sort(key=lambda x: x["score"], reverse=True)
max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1
# ---- 主题权重 ----
topic_scores = defaultdict(float)
topic_counts = defaultdict(int)
for note in scored_notes:
topic = note.get("topic", "").strip()
if topic:
topic_scores[topic] += note["score"]
topic_counts[topic] += 1
topic_weights = {}
for topic, total_score in topic_scores.items():
avg_score = total_score / topic_counts[topic]
# 归一化到 0-100
weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50
# 多篇验证的加分
if topic_counts[topic] >= 3:
weight = min(100, weight + 10)
elif topic_counts[topic] >= 2:
weight = min(100, weight + 5)
topic_weights[topic] = {
"weight": weight,
"count": topic_counts[topic],
"avg_score": round(avg_score, 1),
"total_score": round(total_score, 1),
}
# ---- 风格权重 ----
style_scores = defaultdict(float)
style_counts = defaultdict(int)
for note in scored_notes:
style = note.get("style", "").strip()
if style:
style_scores[style] += note["score"]
style_counts[style] += 1
style_weights = {}
for style, total_score in style_scores.items():
avg = total_score / style_counts[style]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
style_weights[style] = {
"weight": weight,
"count": style_counts[style],
"avg_score": round(avg, 1),
}
# ---- 标签权重 ----
tag_scores = defaultdict(float)
tag_counts = defaultdict(int)
for note in scored_notes:
for tag in note.get("tags", []):
tag = tag.strip().lstrip("#")
if tag:
tag_scores[tag] += note["score"]
tag_counts[tag] += 1
tag_weights = {}
for tag, total_score in tag_scores.items():
avg = total_score / tag_counts[tag]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]}
# 排序后取 Top
tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30])
# ---- 标题模式权重 (提取 emoji/句式/长度特征) ----
title_patterns = defaultdict(list)
for note in scored_notes:
title = note.get("title", "")
if not title:
continue
# 检测标题特征
has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title))
has_question = "" in title or "?" in title
has_exclaim = "" in title or "!" in title
has_ellipsis = "..." in title or "" in title
length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)")
for feature, val in [
("含emoji", has_emoji), ("疑问句式", has_question),
("感叹句式", has_exclaim), ("省略句式", has_ellipsis),
]:
if val:
title_patterns[feature].append(note["score"])
title_patterns[f"长度:{length_bucket}"].append(note["score"])
title_pattern_weights = {}
for pattern, scores in title_patterns.items():
avg = sum(scores) / len(scores) if scores else 0
title_pattern_weights[pattern] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
"avg_score": round(avg, 1),
}
# ---- 发布时间权重 ----
time_scores = defaultdict(list)
for note in scored_notes:
collected = note.get("collected_at", "")
if collected:
try:
dt = datetime.fromisoformat(collected)
hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}"
time_scores[hour_bucket].append(note["score"])
except Exception:
pass
time_weights = {}
for bucket, scores in time_scores.items():
avg = sum(scores) / len(scores) if scores else 0
time_weights[bucket] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
}
# ---- 保存权重 ----
self._weights.update({
"topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"tag_weights": tag_weights,
"title_pattern_weights": title_pattern_weights,
"time_weights": time_weights,
"last_updated": datetime.now().isoformat(),
"total_notes_analyzed": len(scored_notes),
"top_note": {
"title": scored_notes[0].get("title", ""),
"score": scored_notes[0].get("score", 0),
"likes": scored_notes[0].get("likes", 0),
} if scored_notes else {},
})
# 追加分析历史
history = self._weights.get("analysis_history", [])
history.append({
"time": datetime.now().isoformat(),
"total_notes": len(scored_notes),
"avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1),
"top_topic": list(topic_weights.keys())[0] if topic_weights else "",
})
# 只保留最近 50 条
self._weights["analysis_history"] = history[-50:]
self._save_weights()
return {
"total_notes": len(scored_notes),
"top_topics": list(topic_weights.items())[:10],
"top_styles": list(style_weights.items())[:5],
"top_tags": list(tag_weights.items())[:10],
"title_patterns": title_pattern_weights,
"top_note": scored_notes[0] if scored_notes else None,
}
# ========== 加权主题选择 ==========
def get_weighted_topic(self, base_topics: list[str] = None) -> str:
"""
根据权重从主题池中加权随机选择一个主题
如果没有权重数据, 退回均匀随机
"""
import random
topic_weights = self._weights.get("topic_weights", {})
if not topic_weights:
# 无权重数据,从基础池中随机
return random.choice(base_topics) if base_topics else "日常分享"
# 合并: 已有权重的主题 + base_topics 中新的主题
all_topics = {}
for topic, info in topic_weights.items():
all_topics[topic] = info.get("weight", 50)
if base_topics:
for t in base_topics:
if t not in all_topics:
all_topics[t] = 30 # 新主题给一个基础权重
# 加权随机选择
topics = list(all_topics.keys())
weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1
chosen = random.choices(topics, weights=weights, k=1)[0]
logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?"))
return chosen
def get_weighted_style(self, base_styles: list[str] = None) -> str:
"""根据权重选择风格"""
import random
style_weights = self._weights.get("style_weights", {})
if not style_weights:
return random.choice(base_styles) if base_styles else "真实分享"
all_styles = {}
for style, info in style_weights.items():
all_styles[style] = info.get("weight", 50)
if base_styles:
for s in base_styles:
if s not in all_styles:
all_styles[s] = 30
styles = list(all_styles.keys())
weights = [max(1, all_styles[s]) for s in styles]
return random.choices(styles, weights=weights, k=1)[0]
def get_top_tags(self, n: int = 8) -> list[str]:
"""获取权重最高的 N 个标签"""
tag_weights = self._weights.get("tag_weights", {})
if not tag_weights:
return []
sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return [t[0] for t in sorted_tags[:n]]
def get_title_advice(self) -> str:
"""根据标题模式权重生成建议"""
patterns = self._weights.get("title_pattern_weights", {})
if not patterns:
return "暂无标题分析数据"
sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
advice_parts = []
for p_name, p_info in sorted_p[:5]:
advice_parts.append(f"{p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)")
return "\n".join(advice_parts)
# ========== LLM 深度分析 ==========
def generate_llm_analysis_prompt(self) -> str:
"""生成给 LLM 分析笔记表现的 prompt 数据部分"""
notes = self._analytics_data.get("notes", {})
if not notes:
return ""
# 按点赞排序
sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True)
lines = []
for i, note in enumerate(sorted_notes[:20]):
lines.append(
f"#{i+1}{note.get('title', '无标题')}\n"
f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | "
f"风格: {note.get('style', '未知')}\n"
f" 标签: {', '.join(note.get('tags', []))}"
)
return "\n".join(lines)
# ========== 报告生成 ==========
def generate_report(self) -> str:
"""生成 Markdown 格式的分析报告"""
weights = self._weights
notes = self._analytics_data.get("notes", {})
if not notes:
return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。"
total = len(notes)
last_updated = weights.get("last_updated", "未知")
# Top Note
top_note = weights.get("top_note", {})
top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无"
lines = [
f"## 📊 智能内容学习报告",
f"",
f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}",
f"📝 分析笔记数: **{total}** 篇",
f"🏆 最佳笔记: {top_note_str}",
"",
"---",
"",
]
# 主题权重
topic_w = weights.get("topic_weights", {})
if topic_w:
lines.append("### 🎯 主题权重排行")
lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |")
lines.append("|:---:|------|:---:|:---:|:---:|")
for idx, (topic, info) in enumerate(list(topic_w.items())[:10]):
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(
f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |"
)
lines.append("")
# 风格权重
style_w = weights.get("style_weights", {})
if style_w:
lines.append("### 🎨 风格权重排行")
for style, info in list(style_w.items())[:5]:
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)")
lines.append("")
# 标签权重
tag_w = weights.get("tag_weights", {})
if tag_w:
lines.append("### 🏷️ 高权重标签 (Top 10)")
top_tags = list(tag_w.items())[:10]
tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags]
lines.append(" | ".join(tag_strs))
lines.append("")
# 标题模式
title_p = weights.get("title_pattern_weights", {})
if title_p:
lines.append("### ✏️ 标题模式分析")
sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
for p_name, p_info in sorted_p[:6]:
lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)")
lines.append("")
# 建议
lines.append("---")
lines.append("### 💡 智能建议")
if topic_w:
top_3 = list(topic_w.keys())[:3]
lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}")
if tag_w:
hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]]
lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}")
if title_p:
best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0))
lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})")
lines.append("")
lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容")
return "\n".join(lines)
def get_weighted_topics_display(self) -> str:
"""获取加权后的主题列表(用于UI显示)"""
topic_w = self._weights.get("topic_weights", {})
if not topic_w:
return ""
# 按权重排序,返回逗号分隔
sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return ", ".join([t[0] for t in sorted_topics[:15]])
@property
def has_weights(self) -> bool:
"""是否已有权重数据"""
return bool(self._weights.get("topic_weights"))
@property
def weights_summary(self) -> str:
"""一行权重摘要"""
tw = self._weights.get("topic_weights", {})
total = self._weights.get("total_notes_analyzed", 0)
if not tw:
return "暂无权重数据"
top = list(tw.keys())[:3]
return f"{total}篇笔记 | 热门: {', '.join(top)}"