- 优化用户资料和笔记详情的数据提取逻辑,优先从 `raw["raw"]["content"]` 获取内容,并回退到 `raw["content"]` - 在笔记详情解析中,增加从 `result["text"]` 提取文本的备用路径 - 在用户动态流解析中,优先从 `f["id"]` 获取笔记 ID,并增加无 ID 条目的日志警告 ✨ feat(persona): 扩展人设池并集成视觉风格配置 - 新增“赛博AI虚拟博主”和“性感福利主播”人设及其对应的主题与关键词 - 在 `sd_service.py` 中新增 `PERSONA_SD_PROFILES` 字典,为每个人设定义视觉增强词、风格后缀和 LLM 绘图指导 - 新增 `get_persona_sd_profile` 函数,根据人设文本匹配对应的视觉配置 ♻️ refactor(llm): 重构 SD 绘图提示词生成以支持人设 - 修改 `LLMService.get_sd_prompt_guide` 函数签名,新增 `persona` 参数 - 在生成的绘图指南中,根据匹配到的人设追加特定的视觉风格指导文本 - 针对“赛博AI虚拟博主”人设,调整反 AI 检测提示,允许使用高质量词汇和专业光效 - 更新所有调用 `get_sd_prompt_guide` 的地方(如文案生成函数),传入 `persona` 参数 ♻️ refactor(sd): 重构文生图服务以支持人设视觉增强 - 修改 `SDService.txt2img` 函数签名,新增 `persona` 参数 - 在生成最终提示词时,注入人设特定的增强词(`prompt_boost`)和风格词(`prompt_style`) - 在生成最终负面提示词时,追加人设特定的额外负面词(`negative_extra`) - 增加人设视觉增强已注入的日志信息 🔧 chore(config): 更新默认人设配置 - 将 `config_manager.py` 中的默认 `persona` 从“身材管理健身美女”更新为“性感福利主播” 🔧 chore(main): 更新 UI 函数签名以传递人设参数 - 更新 `generate_images` 函数签名,新增 `persona_text` 参数,并在内部解析为人设对象 - 更新 `auto_publish_once` 和 `generate_to_queue` 函数中调用 `sd_svc.txt2img` 的地方,传入 `persona` 参数 - 更新 Gradio 界面中 `btn_gen_img` 的点击事件,将 `persona` 输入传递给 `generate_images` 函数
643 lines
26 KiB
Python
643 lines
26 KiB
Python
"""
|
||
笔记数据分析 & 智能权重学习模块
|
||
定时抓取已发布笔记的互动数据,自动学习哪些内容受欢迎,生成加权主题池
|
||
"""
|
||
import json
|
||
import os
|
||
import re
|
||
import time
|
||
import logging
|
||
import math
|
||
from datetime import datetime, timedelta
|
||
from collections import defaultdict
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
ANALYTICS_FILE = "analytics_data.json"
|
||
WEIGHTS_FILE = "content_weights.json"
|
||
|
||
|
||
def _safe_int(val) -> int:
|
||
"""将 '1.2万' / '1234' / 1234 等格式转为整数"""
|
||
if isinstance(val, (int, float)):
|
||
return int(val)
|
||
if not val:
|
||
return 0
|
||
s = str(val).strip()
|
||
if "万" in s:
|
||
try:
|
||
return int(float(s.replace("万", "")) * 10000)
|
||
except ValueError:
|
||
return 0
|
||
try:
|
||
return int(float(s))
|
||
except ValueError:
|
||
return 0
|
||
|
||
|
||
class AnalyticsService:
|
||
"""笔记表现分析 & 权重学习引擎"""
|
||
|
||
def __init__(self, workspace_dir: str = "xhs_workspace"):
|
||
self.workspace_dir = workspace_dir
|
||
self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE)
|
||
self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE)
|
||
self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""})
|
||
self._weights = self._load_json(self.weights_path, {
|
||
"topic_weights": {},
|
||
"style_weights": {},
|
||
"tag_weights": {},
|
||
"title_pattern_weights": {},
|
||
"time_weights": {},
|
||
"last_updated": "",
|
||
"analysis_history": [],
|
||
})
|
||
|
||
# ========== 持久化 ==========
|
||
|
||
@staticmethod
|
||
def _load_json(path: str, default: dict) -> dict:
|
||
if os.path.exists(path):
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, IOError) as e:
|
||
logger.warning("加载 %s 失败: %s,使用默认值", path, e)
|
||
return default.copy()
|
||
|
||
def _save_analytics(self):
|
||
os.makedirs(self.workspace_dir, exist_ok=True)
|
||
with open(self.analytics_path, "w", encoding="utf-8") as f:
|
||
json.dump(self._analytics_data, f, ensure_ascii=False, indent=2)
|
||
|
||
def _save_weights(self):
|
||
os.makedirs(self.workspace_dir, exist_ok=True)
|
||
with open(self.weights_path, "w", encoding="utf-8") as f:
|
||
json.dump(self._weights, f, ensure_ascii=False, indent=2)
|
||
|
||
# ========== 数据采集 ==========
|
||
|
||
def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict:
|
||
"""
|
||
通过 MCP 获取我的所有笔记及其互动数据,存入 analytics_data.json
|
||
返回 {"total": N, "updated": M, "notes": [...]}
|
||
"""
|
||
logger.info("开始采集笔记表现数据 (user_id=%s)", user_id)
|
||
|
||
raw = mcp_client.get_user_profile(user_id, xsec_token)
|
||
text = ""
|
||
if isinstance(raw, dict):
|
||
# _call_tool 返回 {"success": True, "text": "...", "raw": <mcp原始响应>}
|
||
# 优先从 raw["raw"]["content"] 提取,兼容直接 content
|
||
inner_raw = raw.get("raw", {})
|
||
content_list = []
|
||
if isinstance(inner_raw, dict):
|
||
content_list = inner_raw.get("content", [])
|
||
if not content_list:
|
||
content_list = raw.get("content", [])
|
||
for item in content_list:
|
||
if isinstance(item, dict) and item.get("type") == "text":
|
||
text = item.get("text", "")
|
||
break
|
||
if not text:
|
||
text = raw.get("text", "")
|
||
|
||
# 解析 JSON
|
||
data = None
|
||
for attempt_fn in [
|
||
lambda t: json.loads(t),
|
||
lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)),
|
||
lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)),
|
||
]:
|
||
try:
|
||
data = attempt_fn(text)
|
||
if data:
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if not data:
|
||
return {"total": 0, "updated": 0, "error": "无法解析用户数据"}
|
||
|
||
feeds = data.get("feeds", [])
|
||
if not feeds:
|
||
return {"total": 0, "updated": 0, "error": "未找到笔记数据"}
|
||
|
||
notes_dict = self._analytics_data.get("notes", {})
|
||
updated = 0
|
||
note_summaries = []
|
||
|
||
for f in feeds:
|
||
nc = f.get("noteCard") or {}
|
||
# MCP 用户主页 feeds 中,笔记 ID 在 f["id"] 而非 nc["noteId"]
|
||
note_id = nc.get("noteId") or f.get("id", "") or f.get("noteId", "")
|
||
if not note_id:
|
||
logger.warning("跳过无 ID 的笔记条目: keys=%s", list(f.keys()))
|
||
continue
|
||
|
||
interact = nc.get("interactInfo") or {}
|
||
liked = _safe_int(interact.get("likedCount", 0))
|
||
# MCP 返回的用户主页笔记列表通常只有 likedCount
|
||
# 详情页才有评论数和收藏数,先用点赞数作为主指标
|
||
|
||
title = nc.get("displayTitle", "") or ""
|
||
note_type = nc.get("type", "normal") # normal / video
|
||
|
||
# 从本地备份的文案中提取主题、风格、标签
|
||
local_meta = self._find_local_meta(title)
|
||
|
||
note_data = {
|
||
"note_id": note_id,
|
||
"title": title,
|
||
"type": note_type,
|
||
"likes": liked,
|
||
"topic": local_meta.get("topic", ""),
|
||
"style": local_meta.get("style", ""),
|
||
"tags": local_meta.get("tags", []),
|
||
"sd_prompt": local_meta.get("sd_prompt", ""),
|
||
"collected_at": datetime.now().isoformat(),
|
||
}
|
||
|
||
# 更新或新增
|
||
old = notes_dict.get(note_id, {})
|
||
if old.get("likes", 0) != liked or not old:
|
||
updated += 1
|
||
notes_dict[note_id] = {**old, **note_data}
|
||
|
||
note_summaries.append(note_data)
|
||
|
||
self._analytics_data["notes"] = notes_dict
|
||
self._analytics_data["last_analysis"] = datetime.now().isoformat()
|
||
self._save_analytics()
|
||
|
||
logger.info("采集完成: 共 %d 篇笔记, 更新 %d 篇", len(feeds), updated)
|
||
return {"total": len(feeds), "updated": updated, "notes": note_summaries}
|
||
|
||
def collect_note_details(self, mcp_client, note_id: str, xsec_token: str):
|
||
"""获取单篇笔记的详细数据(点赞、评论数、收藏等)"""
|
||
try:
|
||
result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False)
|
||
text = ""
|
||
if isinstance(result, dict):
|
||
# 兼容 _call_tool 包装格式
|
||
inner_raw = result.get("raw", {})
|
||
content_list = []
|
||
if isinstance(inner_raw, dict):
|
||
content_list = inner_raw.get("content", [])
|
||
if not content_list:
|
||
content_list = result.get("content", [])
|
||
for item in content_list:
|
||
if isinstance(item, dict) and item.get("type") == "text":
|
||
text = item.get("text", "")
|
||
break
|
||
if not text:
|
||
text = result.get("text", "")
|
||
if text:
|
||
data = None
|
||
try:
|
||
data = json.loads(text)
|
||
except Exception:
|
||
m = re.search(r'(\{[\s\S]*\})', text)
|
||
if m:
|
||
try:
|
||
data = json.loads(m.group(1))
|
||
except Exception:
|
||
pass
|
||
if data:
|
||
interact = data.get("interactInfo") or {}
|
||
comments = data.get("comments", [])
|
||
return {
|
||
"likes": _safe_int(interact.get("likedCount", 0)),
|
||
"comments_count": _safe_int(interact.get("commentCount", len(comments))),
|
||
"collects": _safe_int(interact.get("collectedCount", 0)),
|
||
"shares": _safe_int(interact.get("shareCount", 0)),
|
||
}
|
||
except Exception as e:
|
||
logger.warning("获取笔记 %s 详情失败: %s", note_id, e)
|
||
return None
|
||
|
||
def _find_local_meta(self, title: str) -> dict:
|
||
"""从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags"""
|
||
result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""}
|
||
if not title:
|
||
return result
|
||
|
||
# 搜索备份目录
|
||
try:
|
||
for dirname in os.listdir(self.workspace_dir):
|
||
dir_path = os.path.join(self.workspace_dir, dirname)
|
||
if not os.path.isdir(dir_path) or dirname.startswith("_"):
|
||
continue
|
||
txt_path = os.path.join(dir_path, "文案.txt")
|
||
if not os.path.exists(txt_path):
|
||
continue
|
||
try:
|
||
with open(txt_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
# 检查标题是否匹配
|
||
if title[:10] in content or title in dirname:
|
||
# 提取元数据
|
||
for line in content.split("\n"):
|
||
if line.startswith("风格:"):
|
||
result["style"] = line.split(":", 1)[1].strip()
|
||
elif line.startswith("主题:"):
|
||
result["topic"] = line.split(":", 1)[1].strip()
|
||
elif line.startswith("标签:"):
|
||
tags_str = line.split(":", 1)[1].strip()
|
||
result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
|
||
elif line.startswith("SD Prompt:"):
|
||
result["sd_prompt"] = line.split(":", 1)[1].strip()
|
||
break
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
return result
|
||
|
||
# ========== 权重计算 ==========
|
||
|
||
def calculate_weights(self) -> dict:
|
||
"""
|
||
根据已采集的笔记表现数据,计算各维度权重
|
||
使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权
|
||
返回权重摘要
|
||
"""
|
||
notes = self._analytics_data.get("notes", {})
|
||
if not notes:
|
||
return {"error": "暂无笔记数据,请先采集"}
|
||
|
||
# 计算每篇笔记的综合得分
|
||
scored_notes = []
|
||
for nid, note in notes.items():
|
||
likes = note.get("likes", 0)
|
||
comments_count = note.get("comments_count", 0)
|
||
collects = note.get("collects", 0)
|
||
# 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5
|
||
score = likes * 1.0 + comments_count * 2.0 + collects * 1.5
|
||
# 至少用点赞数保底
|
||
if score == 0:
|
||
score = likes
|
||
scored_notes.append({**note, "score": score, "note_id": nid})
|
||
|
||
if not scored_notes:
|
||
return {"error": "没有可分析的笔记"}
|
||
|
||
# 按得分排序
|
||
scored_notes.sort(key=lambda x: x["score"], reverse=True)
|
||
max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1
|
||
|
||
# ---- 主题权重 ----
|
||
topic_scores = defaultdict(float)
|
||
topic_counts = defaultdict(int)
|
||
for note in scored_notes:
|
||
topic = note.get("topic", "").strip()
|
||
if topic:
|
||
topic_scores[topic] += note["score"]
|
||
topic_counts[topic] += 1
|
||
|
||
topic_weights = {}
|
||
for topic, total_score in topic_scores.items():
|
||
avg_score = total_score / topic_counts[topic]
|
||
# 归一化到 0-100
|
||
weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50
|
||
# 多篇验证的加分
|
||
if topic_counts[topic] >= 3:
|
||
weight = min(100, weight + 10)
|
||
elif topic_counts[topic] >= 2:
|
||
weight = min(100, weight + 5)
|
||
topic_weights[topic] = {
|
||
"weight": weight,
|
||
"count": topic_counts[topic],
|
||
"avg_score": round(avg_score, 1),
|
||
"total_score": round(total_score, 1),
|
||
}
|
||
|
||
# ---- 风格权重 ----
|
||
style_scores = defaultdict(float)
|
||
style_counts = defaultdict(int)
|
||
for note in scored_notes:
|
||
style = note.get("style", "").strip()
|
||
if style:
|
||
style_scores[style] += note["score"]
|
||
style_counts[style] += 1
|
||
|
||
style_weights = {}
|
||
for style, total_score in style_scores.items():
|
||
avg = total_score / style_counts[style]
|
||
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
|
||
style_weights[style] = {
|
||
"weight": weight,
|
||
"count": style_counts[style],
|
||
"avg_score": round(avg, 1),
|
||
}
|
||
|
||
# ---- 标签权重 ----
|
||
tag_scores = defaultdict(float)
|
||
tag_counts = defaultdict(int)
|
||
for note in scored_notes:
|
||
for tag in note.get("tags", []):
|
||
tag = tag.strip().lstrip("#")
|
||
if tag:
|
||
tag_scores[tag] += note["score"]
|
||
tag_counts[tag] += 1
|
||
|
||
tag_weights = {}
|
||
for tag, total_score in tag_scores.items():
|
||
avg = total_score / tag_counts[tag]
|
||
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
|
||
tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]}
|
||
|
||
# 排序后取 Top
|
||
tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30])
|
||
|
||
# ---- 标题模式权重 (提取 emoji/句式/长度特征) ----
|
||
title_patterns = defaultdict(list)
|
||
for note in scored_notes:
|
||
title = note.get("title", "")
|
||
if not title:
|
||
continue
|
||
# 检测标题特征
|
||
has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title))
|
||
has_question = "?" in title or "?" in title
|
||
has_exclaim = "!" in title or "!" in title
|
||
has_ellipsis = "..." in title or "…" in title
|
||
length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)")
|
||
|
||
for feature, val in [
|
||
("含emoji", has_emoji), ("疑问句式", has_question),
|
||
("感叹句式", has_exclaim), ("省略句式", has_ellipsis),
|
||
]:
|
||
if val:
|
||
title_patterns[feature].append(note["score"])
|
||
title_patterns[f"长度:{length_bucket}"].append(note["score"])
|
||
|
||
title_pattern_weights = {}
|
||
for pattern, scores in title_patterns.items():
|
||
avg = sum(scores) / len(scores) if scores else 0
|
||
title_pattern_weights[pattern] = {
|
||
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
|
||
"count": len(scores),
|
||
"avg_score": round(avg, 1),
|
||
}
|
||
|
||
# ---- 发布时间权重 ----
|
||
time_scores = defaultdict(list)
|
||
for note in scored_notes:
|
||
collected = note.get("collected_at", "")
|
||
if collected:
|
||
try:
|
||
dt = datetime.fromisoformat(collected)
|
||
hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}时"
|
||
time_scores[hour_bucket].append(note["score"])
|
||
except Exception:
|
||
pass
|
||
|
||
time_weights = {}
|
||
for bucket, scores in time_scores.items():
|
||
avg = sum(scores) / len(scores) if scores else 0
|
||
time_weights[bucket] = {
|
||
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
|
||
"count": len(scores),
|
||
}
|
||
|
||
# ---- 保存权重 ----
|
||
self._weights.update({
|
||
"topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
|
||
"style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
|
||
"tag_weights": tag_weights,
|
||
"title_pattern_weights": title_pattern_weights,
|
||
"time_weights": time_weights,
|
||
"last_updated": datetime.now().isoformat(),
|
||
"total_notes_analyzed": len(scored_notes),
|
||
"top_note": {
|
||
"title": scored_notes[0].get("title", ""),
|
||
"score": scored_notes[0].get("score", 0),
|
||
"likes": scored_notes[0].get("likes", 0),
|
||
} if scored_notes else {},
|
||
})
|
||
|
||
# 追加分析历史
|
||
history = self._weights.get("analysis_history", [])
|
||
history.append({
|
||
"time": datetime.now().isoformat(),
|
||
"total_notes": len(scored_notes),
|
||
"avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1),
|
||
"top_topic": list(topic_weights.keys())[0] if topic_weights else "",
|
||
})
|
||
# 只保留最近 50 条
|
||
self._weights["analysis_history"] = history[-50:]
|
||
self._save_weights()
|
||
|
||
return {
|
||
"total_notes": len(scored_notes),
|
||
"top_topics": list(topic_weights.items())[:10],
|
||
"top_styles": list(style_weights.items())[:5],
|
||
"top_tags": list(tag_weights.items())[:10],
|
||
"title_patterns": title_pattern_weights,
|
||
"top_note": scored_notes[0] if scored_notes else None,
|
||
}
|
||
|
||
# ========== 加权主题选择 ==========
|
||
|
||
def get_weighted_topic(self, base_topics: list[str] = None) -> str:
|
||
"""
|
||
根据权重从主题池中加权随机选择一个主题
|
||
如果没有权重数据, 退回均匀随机
|
||
"""
|
||
import random
|
||
|
||
topic_weights = self._weights.get("topic_weights", {})
|
||
if not topic_weights:
|
||
# 无权重数据,从基础池中随机
|
||
return random.choice(base_topics) if base_topics else "日常分享"
|
||
|
||
# 合并: 已有权重的主题 + base_topics 中新的主题
|
||
all_topics = {}
|
||
for topic, info in topic_weights.items():
|
||
all_topics[topic] = info.get("weight", 50)
|
||
|
||
if base_topics:
|
||
for t in base_topics:
|
||
if t not in all_topics:
|
||
all_topics[t] = 30 # 新主题给一个基础权重
|
||
|
||
# 加权随机选择
|
||
topics = list(all_topics.keys())
|
||
weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1
|
||
chosen = random.choices(topics, weights=weights, k=1)[0]
|
||
|
||
logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?"))
|
||
return chosen
|
||
|
||
def get_weighted_style(self, base_styles: list[str] = None) -> str:
|
||
"""根据权重选择风格"""
|
||
import random
|
||
|
||
style_weights = self._weights.get("style_weights", {})
|
||
if not style_weights:
|
||
return random.choice(base_styles) if base_styles else "真实分享"
|
||
|
||
all_styles = {}
|
||
for style, info in style_weights.items():
|
||
all_styles[style] = info.get("weight", 50)
|
||
|
||
if base_styles:
|
||
for s in base_styles:
|
||
if s not in all_styles:
|
||
all_styles[s] = 30
|
||
|
||
styles = list(all_styles.keys())
|
||
weights = [max(1, all_styles[s]) for s in styles]
|
||
return random.choices(styles, weights=weights, k=1)[0]
|
||
|
||
def get_top_tags(self, n: int = 8) -> list[str]:
|
||
"""获取权重最高的 N 个标签"""
|
||
tag_weights = self._weights.get("tag_weights", {})
|
||
if not tag_weights:
|
||
return []
|
||
sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
|
||
return [t[0] for t in sorted_tags[:n]]
|
||
|
||
def get_title_advice(self) -> str:
|
||
"""根据标题模式权重生成建议"""
|
||
patterns = self._weights.get("title_pattern_weights", {})
|
||
if not patterns:
|
||
return "暂无标题分析数据"
|
||
|
||
sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
|
||
advice_parts = []
|
||
for p_name, p_info in sorted_p[:5]:
|
||
advice_parts.append(f" • {p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)")
|
||
return "\n".join(advice_parts)
|
||
|
||
# ========== LLM 深度分析 ==========
|
||
|
||
def generate_llm_analysis_prompt(self) -> str:
|
||
"""生成给 LLM 分析笔记表现的 prompt 数据部分"""
|
||
notes = self._analytics_data.get("notes", {})
|
||
if not notes:
|
||
return ""
|
||
|
||
# 按点赞排序
|
||
sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True)
|
||
|
||
lines = []
|
||
for i, note in enumerate(sorted_notes[:20]):
|
||
lines.append(
|
||
f"#{i+1} 「{note.get('title', '无标题')}」\n"
|
||
f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | "
|
||
f"风格: {note.get('style', '未知')}\n"
|
||
f" 标签: {', '.join(note.get('tags', []))}"
|
||
)
|
||
return "\n".join(lines)
|
||
|
||
# ========== 报告生成 ==========
|
||
|
||
def generate_report(self) -> str:
|
||
"""生成 Markdown 格式的分析报告"""
|
||
weights = self._weights
|
||
notes = self._analytics_data.get("notes", {})
|
||
|
||
if not notes:
|
||
return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。"
|
||
|
||
total = len(notes)
|
||
last_updated = weights.get("last_updated", "未知")
|
||
|
||
# Top Note
|
||
top_note = weights.get("top_note", {})
|
||
top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无"
|
||
|
||
lines = [
|
||
f"## 📊 智能内容学习报告",
|
||
f"",
|
||
f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}",
|
||
f"📝 分析笔记数: **{total}** 篇",
|
||
f"🏆 最佳笔记: {top_note_str}",
|
||
"",
|
||
"---",
|
||
"",
|
||
]
|
||
|
||
# 主题权重
|
||
topic_w = weights.get("topic_weights", {})
|
||
if topic_w:
|
||
lines.append("### 🎯 主题权重排行")
|
||
lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |")
|
||
lines.append("|:---:|------|:---:|:---:|:---:|")
|
||
for idx, (topic, info) in enumerate(list(topic_w.items())[:10]):
|
||
bar = "█" * (info["weight"] // 10) + "░" * (10 - info["weight"] // 10)
|
||
lines.append(
|
||
f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |"
|
||
)
|
||
lines.append("")
|
||
|
||
# 风格权重
|
||
style_w = weights.get("style_weights", {})
|
||
if style_w:
|
||
lines.append("### 🎨 风格权重排行")
|
||
for style, info in list(style_w.items())[:5]:
|
||
bar = "█" * (info["weight"] // 10) + "░" * (10 - info["weight"] // 10)
|
||
lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)")
|
||
lines.append("")
|
||
|
||
# 标签权重
|
||
tag_w = weights.get("tag_weights", {})
|
||
if tag_w:
|
||
lines.append("### 🏷️ 高权重标签 (Top 10)")
|
||
top_tags = list(tag_w.items())[:10]
|
||
tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags]
|
||
lines.append(" | ".join(tag_strs))
|
||
lines.append("")
|
||
|
||
# 标题模式
|
||
title_p = weights.get("title_pattern_weights", {})
|
||
if title_p:
|
||
lines.append("### ✏️ 标题模式分析")
|
||
sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
|
||
for p_name, p_info in sorted_p[:6]:
|
||
lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)")
|
||
lines.append("")
|
||
|
||
# 建议
|
||
lines.append("---")
|
||
lines.append("### 💡 智能建议")
|
||
if topic_w:
|
||
top_3 = list(topic_w.keys())[:3]
|
||
lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}")
|
||
if tag_w:
|
||
hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]]
|
||
lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}")
|
||
if title_p:
|
||
best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0))
|
||
lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})")
|
||
|
||
lines.append("")
|
||
lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def get_weighted_topics_display(self) -> str:
|
||
"""获取加权后的主题列表(用于UI显示)"""
|
||
topic_w = self._weights.get("topic_weights", {})
|
||
if not topic_w:
|
||
return ""
|
||
# 按权重排序,返回逗号分隔
|
||
sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
|
||
return ", ".join([t[0] for t in sorted_topics[:15]])
|
||
|
||
@property
|
||
def has_weights(self) -> bool:
|
||
"""是否已有权重数据"""
|
||
return bool(self._weights.get("topic_weights"))
|
||
|
||
@property
|
||
def weights_summary(self) -> str:
|
||
"""一行权重摘要"""
|
||
tw = self._weights.get("topic_weights", {})
|
||
total = self._weights.get("total_notes_analyzed", 0)
|
||
if not tw:
|
||
return "暂无权重数据"
|
||
top = list(tw.keys())[:3]
|
||
return f"{total}篇笔记 | 热门: {', '.join(top)}"
|