xhs_factory/services/analytics_service.py
zhoujie 4d83c0f4a9
Some checks failed
CI / Lint (ruff) (push) Has been cancelled
CI / Import Check (push) Has been cancelled
feat(scheduler): 新增热点自动采集功能并优化发布路径
- 新增热点自动采集后台线程,支持定时搜索关键词并执行 AI 分析,结果缓存至结构化状态
- 新增热点分析状态管理接口,提供线程安全的 `get_last_analysis` 和 `set_last_analysis` 方法
- 新增热点数据桥接函数 `feed_hotspot_to_engine`,将分析结果注入 TopicEngine 实现热点加权推荐
- 新增热点选题下拉组件,分析完成后自动填充推荐选题,选中后自动写入选题输入框
- 优化 `generate_from_hotspot` 函数,自动获取结构化分析摘要并增强生成上下文
- 新增热点自动采集配置节点,支持通过 `config.json` 管理关键词和采集间隔

♻️ refactor(queue): 实现智能排期引擎并统一发布路径

- 新增智能排期引擎,基于 `AnalyticsService` 的 `time_weights` 自动计算最优发布时段
- 新增 `PublishQueue.suggest_schedule_time` 和 `auto_schedule_item` 方法,支持时段冲突检测和内容分布控制
- 修改 `generate_to_queue` 函数,新增 `auto_schedule` 和 `auto_approve` 参数,支持自动排期和自动审核
- 重构 `_scheduler_loop` 的自动发布分支,改为调用 `generate_to_queue` 通过队列发布,统一发布路径
- 重构 `auto_publish_once` 函数,移除直接发布逻辑,改为生成内容入队并返回队列信息
- 新增队列时段使用情况查询方法 `get_slot_usage`,支持 UI 热力图展示

📝 docs(openspec): 新增内容排期优化和热点探测优化规范文档

- 新增 `smart-schedule-engine` 规范,定义智能排期引擎的功能需求和场景
- 新增 `unified-publish-path` 规范,定义统一发布路径的改造方案
- 新增 `hotspot-analysis-state` 规范,定义热点分析状态存储的线程安全接口
- 新增 `hotspot-auto-collector` 规范,定义定时热点自动采集的任务流程
- 新增 `hotspot-engine-bridge` 规范,定义热点数据注入 TopicEngine 的桥接机制
- 新增 `hotspot-topic-selector` 规范,定义热点选题下拉组件的交互行为
- 更新 `services-queue`、`services-scheduler` 和 `services-hotspot` 规范,反映功能修改和新增参数

🔧 chore(config): 新增热点自动采集默认配置

- 在 `DEFAULT_CONFIG` 中新增 `hotspot_auto_collect` 配置节点,包含 `enabled`、`keywords` 和 `interval_hours` 字段
- 提供默认关键词列表 `["穿搭", "美妆", "好物"]` 和默认采集间隔 4 小时

🐛 fix(llm): 增强 JSON 解析容错能力

- 新增 `_try_fix_truncated_json` 方法,尝试修复被 token 限制截断的 JSON 输出
- 支持多种截断场景的自动补全,包括字符串值、数组和嵌套对象的截断修复
- 提高 LLM 分析热点等返回 JSON 的函数的稳定性

💄 style(ui): 优化队列管理和热点探测界面

- 在队列生成区域新增自动排期复选框,勾选后隐藏手动排期输入框
- 在日历视图旁新增推荐时段 Markdown 面板,展示各时段权重和建议热力图
- 在热点探测 Tab 新增推荐选题下拉组件,分析完成后动态填充选项
- 在热点探测 Tab 新增热点自动采集控制区域,支持启动、停止和配置采集参数
2026-02-28 22:22:27 +08:00

686 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
笔记数据分析 & 智能权重学习模块
定时抓取已发布笔记的互动数据,自动学习哪些内容受欢迎,生成加权主题池
"""
import json
import os
import re
import tempfile
import time
import logging
import math
from datetime import datetime, timedelta
from collections import defaultdict
logger = logging.getLogger(__name__)
ANALYTICS_FILE = "analytics_data.json"
WEIGHTS_FILE = "content_weights.json"
def _safe_int(val) -> int:
"""'1.2万' / '1234' / 1234 等格式转为整数"""
if isinstance(val, (int, float)):
return int(val)
if not val:
return 0
s = str(val).strip()
if "" in s:
try:
return int(float(s.replace("", "")) * 10000)
except ValueError:
return 0
try:
return int(float(s))
except ValueError:
return 0
class AnalyticsService:
"""笔记表现分析 & 权重学习引擎"""
def __init__(self, workspace_dir: str = "xhs_workspace"):
self.workspace_dir = workspace_dir
self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE)
self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE)
self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""})
self._weights = self._load_json(self.weights_path, {
"topic_weights": {},
"style_weights": {},
"tag_weights": {},
"title_pattern_weights": {},
"time_weights": {},
"last_updated": "",
"analysis_history": [],
})
# ========== 持久化 ==========
@staticmethod
def _load_json(path: str, default: dict) -> dict:
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning("加载 %s 失败: %s,使用默认值", path, e)
return default.copy()
def _save_analytics(self):
os.makedirs(self.workspace_dir, exist_ok=True)
target = self.analytics_path
target_dir = os.path.dirname(os.path.abspath(target))
fd, tmp = tempfile.mkstemp(dir=target_dir, suffix=".tmp", prefix="analytics_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._analytics_data, f, ensure_ascii=False, indent=2)
os.replace(tmp, target)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
def _save_weights(self):
os.makedirs(self.workspace_dir, exist_ok=True)
target = self.weights_path
target_dir = os.path.dirname(os.path.abspath(target))
fd, tmp = tempfile.mkstemp(dir=target_dir, suffix=".tmp", prefix="weights_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._weights, f, ensure_ascii=False, indent=2)
os.replace(tmp, target)
except Exception:
try:
os.remove(tmp)
except OSError:
pass
raise
# ========== 数据采集 ==========
def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict:
"""
通过 MCP 获取我的所有笔记及其互动数据,存入 analytics_data.json
返回 {"total": N, "updated": M, "notes": [...]}
"""
logger.info("开始采集笔记表现数据 (user_id=%s)", user_id)
raw = mcp_client.get_user_profile(user_id, xsec_token)
text = ""
if isinstance(raw, dict):
# _call_tool 返回 {"success": True, "text": "...", "raw": <mcp原始响应>}
# 优先从 raw["raw"]["content"] 提取,兼容直接 content
inner_raw = raw.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = raw.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = raw.get("text", "")
# 解析 JSON
data = None
for attempt_fn in [
lambda t: json.loads(t),
lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)),
lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)),
]:
try:
data = attempt_fn(text)
if data:
break
except Exception:
continue
if not data:
return {"total": 0, "updated": 0, "error": "无法解析用户数据"}
feeds = data.get("feeds", [])
if not feeds:
return {"total": 0, "updated": 0, "error": "未找到笔记数据"}
notes_dict = self._analytics_data.get("notes", {})
updated = 0
note_summaries = []
for f in feeds:
nc = f.get("noteCard") or {}
# MCP 用户主页 feeds 中,笔记 ID 在 f["id"] 而非 nc["noteId"]
note_id = nc.get("noteId") or f.get("id", "") or f.get("noteId", "")
if not note_id:
logger.warning("跳过无 ID 的笔记条目: keys=%s", list(f.keys()))
continue
interact = nc.get("interactInfo") or {}
liked = _safe_int(interact.get("likedCount", 0))
# MCP 返回的用户主页笔记列表通常只有 likedCount
# 详情页才有评论数和收藏数,先用点赞数作为主指标
title = nc.get("displayTitle", "") or ""
note_type = nc.get("type", "normal") # normal / video
# 从本地备份的文案中提取主题、风格、标签
local_meta = self._find_local_meta(title)
note_data = {
"note_id": note_id,
"title": title,
"type": note_type,
"likes": liked,
"topic": local_meta.get("topic", ""),
"style": local_meta.get("style", ""),
"tags": local_meta.get("tags", []),
"sd_prompt": local_meta.get("sd_prompt", ""),
"collected_at": datetime.now().isoformat(),
}
# 更新或新增
old = notes_dict.get(note_id, {})
if old.get("likes", 0) != liked or not old:
updated += 1
notes_dict[note_id] = {**old, **note_data}
note_summaries.append(note_data)
self._analytics_data["notes"] = notes_dict
self._analytics_data["last_analysis"] = datetime.now().isoformat()
self._save_analytics()
logger.info("采集完成: 共 %d 篇笔记, 更新 %d", len(feeds), updated)
return {"total": len(feeds), "updated": updated, "notes": note_summaries}
def collect_note_details(self, mcp_client, note_id: str, xsec_token: str):
"""获取单篇笔记的详细数据(点赞、评论数、收藏等)"""
try:
result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False)
text = ""
if isinstance(result, dict):
# 兼容 _call_tool 包装格式
inner_raw = result.get("raw", {})
content_list = []
if isinstance(inner_raw, dict):
content_list = inner_raw.get("content", [])
if not content_list:
content_list = result.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = result.get("text", "")
if text:
data = None
try:
data = json.loads(text)
except Exception:
m = re.search(r'(\{[\s\S]*\})', text)
if m:
try:
data = json.loads(m.group(1))
except Exception:
pass
if data:
interact = data.get("interactInfo") or {}
comments = data.get("comments", [])
return {
"likes": _safe_int(interact.get("likedCount", 0)),
"comments_count": _safe_int(interact.get("commentCount", len(comments))),
"collects": _safe_int(interact.get("collectedCount", 0)),
"shares": _safe_int(interact.get("shareCount", 0)),
}
except Exception as e:
logger.warning("获取笔记 %s 详情失败: %s", note_id, e)
return None
def _find_local_meta(self, title: str) -> dict:
"""从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags"""
result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""}
if not title:
return result
# 搜索备份目录
try:
for dirname in os.listdir(self.workspace_dir):
dir_path = os.path.join(self.workspace_dir, dirname)
if not os.path.isdir(dir_path) or dirname.startswith("_"):
continue
txt_path = os.path.join(dir_path, "文案.txt")
if not os.path.exists(txt_path):
continue
try:
with open(txt_path, "r", encoding="utf-8") as f:
content = f.read()
# 检查标题是否匹配
if title[:10] in content or title in dirname:
# 提取元数据
for line in content.split("\n"):
if line.startswith("风格:"):
result["style"] = line.split(":", 1)[1].strip()
elif line.startswith("主题:"):
result["topic"] = line.split(":", 1)[1].strip()
elif line.startswith("标签:"):
tags_str = line.split(":", 1)[1].strip()
result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
elif line.startswith("SD Prompt:"):
result["sd_prompt"] = line.split(":", 1)[1].strip()
break
except Exception:
continue
except Exception:
pass
return result
# ========== 权重计算 ==========
def calculate_weights(self) -> dict:
"""
根据已采集的笔记表现数据,计算各维度权重
使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权
返回权重摘要
"""
notes = self._analytics_data.get("notes", {})
if not notes:
return {"error": "暂无笔记数据,请先采集"}
# 计算每篇笔记的综合得分
scored_notes = []
for nid, note in notes.items():
likes = note.get("likes", 0)
comments_count = note.get("comments_count", 0)
collects = note.get("collects", 0)
# 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5
score = likes * 1.0 + comments_count * 2.0 + collects * 1.5
# 至少用点赞数保底
if score == 0:
score = likes
scored_notes.append({**note, "score": score, "note_id": nid})
if not scored_notes:
return {"error": "没有可分析的笔记"}
# 按得分排序
scored_notes.sort(key=lambda x: x["score"], reverse=True)
max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1
# ---- 主题权重 ----
topic_scores = defaultdict(float)
topic_counts = defaultdict(int)
for note in scored_notes:
topic = note.get("topic", "").strip()
if topic:
topic_scores[topic] += note["score"]
topic_counts[topic] += 1
topic_weights = {}
for topic, total_score in topic_scores.items():
avg_score = total_score / topic_counts[topic]
# 归一化到 0-100
weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50
# 多篇验证的加分
if topic_counts[topic] >= 3:
weight = min(100, weight + 10)
elif topic_counts[topic] >= 2:
weight = min(100, weight + 5)
topic_weights[topic] = {
"weight": weight,
"count": topic_counts[topic],
"avg_score": round(avg_score, 1),
"total_score": round(total_score, 1),
}
# ---- 风格权重 ----
style_scores = defaultdict(float)
style_counts = defaultdict(int)
for note in scored_notes:
style = note.get("style", "").strip()
if style:
style_scores[style] += note["score"]
style_counts[style] += 1
style_weights = {}
for style, total_score in style_scores.items():
avg = total_score / style_counts[style]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
style_weights[style] = {
"weight": weight,
"count": style_counts[style],
"avg_score": round(avg, 1),
}
# ---- 标签权重 ----
tag_scores = defaultdict(float)
tag_counts = defaultdict(int)
for note in scored_notes:
for tag in note.get("tags", []):
tag = tag.strip().lstrip("#")
if tag:
tag_scores[tag] += note["score"]
tag_counts[tag] += 1
tag_weights = {}
for tag, total_score in tag_scores.items():
avg = total_score / tag_counts[tag]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]}
# 排序后取 Top
tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30])
# ---- 标题模式权重 (提取 emoji/句式/长度特征) ----
title_patterns = defaultdict(list)
for note in scored_notes:
title = note.get("title", "")
if not title:
continue
# 检测标题特征
has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title))
has_question = "" in title or "?" in title
has_exclaim = "" in title or "!" in title
has_ellipsis = "..." in title or "" in title
length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)")
for feature, val in [
("含emoji", has_emoji), ("疑问句式", has_question),
("感叹句式", has_exclaim), ("省略句式", has_ellipsis),
]:
if val:
title_patterns[feature].append(note["score"])
title_patterns[f"长度:{length_bucket}"].append(note["score"])
title_pattern_weights = {}
for pattern, scores in title_patterns.items():
avg = sum(scores) / len(scores) if scores else 0
title_pattern_weights[pattern] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
"avg_score": round(avg, 1),
}
# ---- 发布时间权重 ----
time_scores = defaultdict(list)
for note in scored_notes:
collected = note.get("collected_at", "")
if collected:
try:
dt = datetime.fromisoformat(collected)
hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}"
time_scores[hour_bucket].append(note["score"])
except Exception:
pass
time_weights = {}
for bucket, scores in time_scores.items():
avg = sum(scores) / len(scores) if scores else 0
time_weights[bucket] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
}
# ---- 保存权重 ----
self._weights.update({
"topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"tag_weights": tag_weights,
"title_pattern_weights": title_pattern_weights,
"time_weights": time_weights,
"last_updated": datetime.now().isoformat(),
"total_notes_analyzed": len(scored_notes),
"top_note": {
"title": scored_notes[0].get("title", ""),
"score": scored_notes[0].get("score", 0),
"likes": scored_notes[0].get("likes", 0),
} if scored_notes else {},
})
# 追加分析历史
history = self._weights.get("analysis_history", [])
history.append({
"time": datetime.now().isoformat(),
"total_notes": len(scored_notes),
"avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1),
"top_topic": list(topic_weights.keys())[0] if topic_weights else "",
})
# 只保留最近 50 条
self._weights["analysis_history"] = history[-50:]
self._save_weights()
return {
"total_notes": len(scored_notes),
"top_topics": list(topic_weights.items())[:10],
"top_styles": list(style_weights.items())[:5],
"top_tags": list(tag_weights.items())[:10],
"title_patterns": title_pattern_weights,
"top_note": scored_notes[0] if scored_notes else None,
}
# ========== 加权主题选择 ==========
def get_weighted_topic(self, base_topics: list[str] = None) -> str:
"""
根据权重从主题池中加权随机选择一个主题
如果没有权重数据, 退回均匀随机
"""
import random
topic_weights = self._weights.get("topic_weights", {})
if not topic_weights:
# 无权重数据,从基础池中随机
return random.choice(base_topics) if base_topics else "日常分享"
# 合并: 已有权重的主题 + base_topics 中新的主题
all_topics = {}
for topic, info in topic_weights.items():
all_topics[topic] = info.get("weight", 50)
if base_topics:
for t in base_topics:
if t not in all_topics:
all_topics[t] = 30 # 新主题给一个基础权重
# 加权随机选择
topics = list(all_topics.keys())
weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1
chosen = random.choices(topics, weights=weights, k=1)[0]
logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?"))
return chosen
def get_weighted_style(self, base_styles: list[str] = None) -> str:
"""根据权重选择风格"""
import random
style_weights = self._weights.get("style_weights", {})
if not style_weights:
return random.choice(base_styles) if base_styles else "真实分享"
all_styles = {}
for style, info in style_weights.items():
all_styles[style] = info.get("weight", 50)
if base_styles:
for s in base_styles:
if s not in all_styles:
all_styles[s] = 30
styles = list(all_styles.keys())
weights = [max(1, all_styles[s]) for s in styles]
return random.choices(styles, weights=weights, k=1)[0]
def get_top_tags(self, n: int = 8) -> list[str]:
"""获取权重最高的 N 个标签"""
tag_weights = self._weights.get("tag_weights", {})
if not tag_weights:
return []
sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return [t[0] for t in sorted_tags[:n]]
def get_title_advice(self) -> str:
"""根据标题模式权重生成建议"""
patterns = self._weights.get("title_pattern_weights", {})
if not patterns:
return "暂无标题分析数据"
sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
advice_parts = []
for p_name, p_info in sorted_p[:5]:
advice_parts.append(f"{p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)")
return "\n".join(advice_parts)
# ========== 时段权重查询 ==========
_DEFAULT_TIME_WEIGHTS = {
"08-11时": {"weight": 70, "count": 0},
"12-14时": {"weight": 60, "count": 0},
"18-21时": {"weight": 85, "count": 0},
"21-24时": {"weight": 75, "count": 0},
}
def get_time_weights(self) -> dict:
"""返回各时段权重字典。
有分析数据时返回 time_weights无数据时返回默认高流量时段。
返回格式: {"18-21时": {"weight": 85, "count": 12}, ...}
"""
tw = self._weights.get("time_weights", {})
if tw:
return tw
return dict(self._DEFAULT_TIME_WEIGHTS)
# ========== LLM 深度分析 ==========
def generate_llm_analysis_prompt(self) -> str:
"""生成给 LLM 分析笔记表现的 prompt 数据部分"""
notes = self._analytics_data.get("notes", {})
if not notes:
return ""
# 按点赞排序
sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True)
lines = []
for i, note in enumerate(sorted_notes[:20]):
lines.append(
f"#{i+1}{note.get('title', '无标题')}\n"
f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | "
f"风格: {note.get('style', '未知')}\n"
f" 标签: {', '.join(note.get('tags', []))}"
)
return "\n".join(lines)
# ========== 报告生成 ==========
def generate_report(self) -> str:
"""生成 Markdown 格式的分析报告"""
weights = self._weights
notes = self._analytics_data.get("notes", {})
if not notes:
return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。"
total = len(notes)
last_updated = weights.get("last_updated", "未知")
# Top Note
top_note = weights.get("top_note", {})
top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无"
lines = [
f"## 📊 智能内容学习报告",
f"",
f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}",
f"📝 分析笔记数: **{total}** 篇",
f"🏆 最佳笔记: {top_note_str}",
"",
"---",
"",
]
# 主题权重
topic_w = weights.get("topic_weights", {})
if topic_w:
lines.append("### 🎯 主题权重排行")
lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |")
lines.append("|:---:|------|:---:|:---:|:---:|")
for idx, (topic, info) in enumerate(list(topic_w.items())[:10]):
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(
f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |"
)
lines.append("")
# 风格权重
style_w = weights.get("style_weights", {})
if style_w:
lines.append("### 🎨 风格权重排行")
for style, info in list(style_w.items())[:5]:
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)")
lines.append("")
# 标签权重
tag_w = weights.get("tag_weights", {})
if tag_w:
lines.append("### 🏷️ 高权重标签 (Top 10)")
top_tags = list(tag_w.items())[:10]
tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags]
lines.append(" | ".join(tag_strs))
lines.append("")
# 标题模式
title_p = weights.get("title_pattern_weights", {})
if title_p:
lines.append("### ✏️ 标题模式分析")
sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
for p_name, p_info in sorted_p[:6]:
lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)")
lines.append("")
# 建议
lines.append("---")
lines.append("### 💡 智能建议")
if topic_w:
top_3 = list(topic_w.keys())[:3]
lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}")
if tag_w:
hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]]
lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}")
if title_p:
best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0))
lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})")
lines.append("")
lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容")
return "\n".join(lines)
def get_weighted_topics_display(self) -> str:
"""获取加权后的主题列表(用于UI显示)"""
topic_w = self._weights.get("topic_weights", {})
if not topic_w:
return ""
# 按权重排序,返回逗号分隔
sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return ", ".join([t[0] for t in sorted_topics[:15]])
@property
def has_weights(self) -> bool:
"""是否已有权重数据"""
return bool(self._weights.get("topic_weights"))
@property
def weights_summary(self) -> str:
"""一行权重摘要"""
tw = self._weights.get("topic_weights", {})
total = self._weights.get("total_notes_analyzed", 0)
if not tw:
return "暂无权重数据"
top = list(tw.keys())[:3]
return f"{total}篇笔记 | 热门: {', '.join(top)}"