""" services/topic_engine.py 智能选题引擎 — 聚合热点数据 + 历史权重,推荐高潜力选题 """ import logging import os import json import re from datetime import datetime, timedelta logger = logging.getLogger("autobot") class TopicEngine: """ 智能选题推荐引擎 职责: 聚合热点探测结果与历史互动权重,为用户推荐高潜力选题。 不直接访问 MCP / LLM,通过注入的 AnalyticsService 获取数据。 """ def __init__(self, analytics_service): """ Args: analytics_service: AnalyticsService 实例,提供权重和笔记数据 """ self.analytics = analytics_service # ========== 核心: 多维度评分 ========== def score_topic(self, topic: str, hotspot_data: dict = None) -> dict: """ 为单个候选主题计算综合评分 维度: - hotspot_score (0-40): 热点热度 - weight_score (0-30): 历史互动权重 - scarcity_score(0-20): 内容稀缺度 - timeliness_score(0-10): 时效性 Args: topic: 候选主题文本 hotspot_data: 可选的热点分析数据(包含 hot_topics, suggestions 等) Returns: dict with total_score, hotspot_score, weight_score, scarcity_score, timeliness_score """ hotspot_score = self._calc_hotspot_score(topic, hotspot_data) weight_score = self._calc_weight_score(topic) scarcity_score = self._calc_scarcity_score(topic) timeliness_score = self._calc_timeliness_score(topic) total = hotspot_score + weight_score + scarcity_score + timeliness_score return { "total_score": total, "hotspot_score": hotspot_score, "weight_score": weight_score, "scarcity_score": scarcity_score, "timeliness_score": timeliness_score, } # ========== 推荐主题列表 ========== def recommend_topics(self, count: int = 5, hotspot_data: dict = None) -> list[dict]: """ 推荐排序后的选题列表 逻辑: 1. 收集候选主题 (热点 + 权重主题) 2. 对每个主题评分 3. 去重 (语义相近合并) 4. 按总分降序取 top-N 5. 为每个主题生成创作角度建议 Args: count: 返回推荐数量 (默认 5) hotspot_data: 可选的热点分析数据 Returns: list of dict, 每项包含: topic, score, reason, source, angles, score_detail (各维度分数) """ candidates = self._collect_candidates(hotspot_data) if not candidates: logger.warning("选题引擎: 无候选主题可推荐") return [] # 评分 scored = [] for topic, source in candidates: detail = self.score_topic(topic, hotspot_data) scored.append({ "topic": topic, "score": detail["total_score"], "source": source, "score_detail": detail, }) # 去重 scored = self._deduplicate(scored) # 排序 scored.sort(key=lambda x: x["score"], reverse=True) scored = scored[:count] # 生成 reason 和 angles for item in scored: item["reason"] = self._generate_reason(item) item["angles"] = self._generate_angles(item["topic"], item["source"]) return scored # ========== 候选收集 ========== def _collect_candidates(self, hotspot_data: dict = None) -> list[tuple[str, str]]: """ 收集所有候选主题,返回 [(topic, source), ...] source: "hotspot" | "weight" | "trend" """ candidates = [] seen = set() # 1. 从热点数据收集 if hotspot_data: for topic in hotspot_data.get("hot_topics", []): topic_clean = self._clean_topic(topic) if topic_clean and topic_clean not in seen: candidates.append((topic_clean, "hotspot")) seen.add(topic_clean) for suggestion in hotspot_data.get("suggestions", []): topic_clean = self._clean_topic(suggestion.get("topic", "")) if topic_clean and topic_clean not in seen: candidates.append((topic_clean, "hotspot")) seen.add(topic_clean) # 2. 从权重数据收集 topic_weights = self.analytics._weights.get("topic_weights", {}) for topic, info in topic_weights.items(): topic_clean = self._clean_topic(topic) if topic_clean and topic_clean not in seen: candidates.append((topic_clean, "weight")) seen.add(topic_clean) # 3. 从分析历史提取趋势主题 history = self.analytics._weights.get("analysis_history", []) for entry in history[-5:]: top_topic = entry.get("top_topic", "") if top_topic and top_topic not in seen: candidates.append((top_topic, "trend")) seen.add(top_topic) return candidates # ========== 评分子模块 ========== def _calc_hotspot_score(self, topic: str, hotspot_data: dict = None) -> int: """热点热度评分 (0-40)""" if not hotspot_data: return 0 score = 0 # 检查是否在热门主题中 hot_topics = hotspot_data.get("hot_topics", []) for i, ht in enumerate(hot_topics): if self._topic_similar(topic, ht): # 排名越靠前分越高 score = max(score, 40 - i * 5) break # 检查是否在推荐建议中 suggestions = hotspot_data.get("suggestions", []) for suggestion in suggestions: if self._topic_similar(topic, suggestion.get("topic", "")): score = max(score, 30) break return min(40, score) def _calc_weight_score(self, topic: str) -> int: """历史互动权重评分 (0-30)""" topic_weights = self.analytics._weights.get("topic_weights", {}) if not topic_weights: return 0 # 精确匹配 if topic in topic_weights: weight = topic_weights[topic].get("weight", 0) # weight 原始范围 0-100,映射到 0-30 return min(30, int(weight * 0.3)) # 模糊匹配 best_score = 0 for existing_topic, info in topic_weights.items(): if self._topic_similar(topic, existing_topic): weight = info.get("weight", 0) best_score = max(best_score, min(30, int(weight * 0.3))) return best_score def _calc_scarcity_score(self, topic: str) -> int: """ 内容稀缺度评分 (0-20) 近 7 天已发布 >= 2 篇的主题: scarcity_score <= 5 """ notes = self.analytics._analytics_data.get("notes", {}) seven_days_ago = (datetime.now() - timedelta(days=7)).isoformat() recent_count = 0 for nid, note in notes.items(): collected = note.get("collected_at", "") if collected >= seven_days_ago: note_topic = note.get("topic", "") if self._topic_similar(topic, note_topic): recent_count += 1 if recent_count >= 2: return min(5, max(0, 5 - recent_count)) # 发的越多越低 elif recent_count == 1: return 12 # 有一篇,中等稀缺 else: return 20 # 完全空白,高稀缺 def _calc_timeliness_score(self, topic: str) -> int: """ 时效性评分 (0-10) 基于主题是否包含时效性关键词(季节、节日等) """ now = datetime.now() month = now.month # 季节关键词 season_keywords = { "春": [2, 3, 4, 5], "夏": [5, 6, 7, 8], "秋": [8, 9, 10, 11], "冬": [11, 12, 1, 2], "早春": [2, 3], "初夏": [5, 6], "初秋": [8, 9], } # 节日关键词 festival_windows = { "情人节": (2, 10, 2, 18), "三八": (3, 1, 3, 12), "妇女节": (3, 1, 3, 12), "母亲节": (5, 5, 5, 15), "618": (6, 1, 6, 20), "七夕": (7, 20, 8, 15), "中秋": (9, 1, 9, 30), "国庆": (9, 25, 10, 10), "双十一": (10, 20, 11, 15), "双11": (10, 20, 11, 15), "双十二": (12, 1, 12, 15), "圣诞": (12, 15, 12, 28), "元旦": (12, 25, 1, 5), "年货": (1, 5, 2, 10), "春节": (1, 10, 2, 10), "开学": (8, 20, 9, 15), } score = 5 # 基础分 # 季节匹配 for keyword, months in season_keywords.items(): if keyword in topic and month in months: score = max(score, 8) break # 节日窗口匹配 for keyword, (m1, d1, m2, d2) in festival_windows.items(): if keyword in topic: start = datetime(now.year, m1, d1) end = datetime(now.year, m2, d2) # 处理跨年 if start > end: if now >= start or now <= end: score = 10 break elif start <= now <= end: score = 10 break else: score = max(score, 3) # 不在窗口期但有时效关键词 return score # ========== 去重 ========== def _deduplicate(self, scored: list[dict]) -> list[dict]: """ 去重: 语义相近的主题合并,保留分数较高者 例: "春季穿搭" 和 "早春穿搭" 合并为高分项 """ if len(scored) <= 1: return scored result = [] merged_indices = set() for i in range(len(scored)): if i in merged_indices: continue best = scored[i] for j in range(i + 1, len(scored)): if j in merged_indices: continue if self._topic_similar(scored[i]["topic"], scored[j]["topic"]): merged_indices.add(j) if scored[j]["score"] > best["score"]: best = scored[j] result.append(best) return result # ========== 辅助方法 ========== @staticmethod def _clean_topic(topic: str) -> str: """清理主题文本""" if not topic: return "" # 去除序号、emoji、多余空格 t = re.sub(r'^[\d.、)\]】]+\s*', '', topic.strip()) t = re.sub(r'[•·●]', '', t) return t.strip() @staticmethod def _topic_similar(a: str, b: str) -> bool: """ 判断两个主题是否语义相近 (简单规则匹配) 策略: 1. 完全相同 → True 2. 一方包含另一方 → True 3. 去除修饰词后相同 → True 4. 共享核心词比例 > 60% → True """ if not a or not b: return False a_clean = a.strip().lower() b_clean = b.strip().lower() # 完全相同 if a_clean == b_clean: return True # 包含关系 if a_clean in b_clean or b_clean in a_clean: return True # 去修饰词 modifiers = ["早", "初", "晚", "新", "最", "超", "巨", "真的", "必看"] a_core = a_clean b_core = b_clean for mod in modifiers: a_core = a_core.replace(mod, "") b_core = b_core.replace(mod, "") if a_core and b_core and a_core == b_core: return True # 核心词重叠 # 按字分词 (中文简单分词) a_chars = set(a_clean) b_chars = set(b_clean) if len(a_chars) >= 2 and len(b_chars) >= 2: intersection = a_chars & b_chars union = a_chars | b_chars if len(intersection) / len(union) > 0.6: return True return False @staticmethod def _generate_reason(item: dict) -> str: """根据评分生成推荐理由""" detail = item.get("score_detail", {}) parts = [] if detail.get("hotspot_score", 0) >= 25: parts.append("当前热点话题") if detail.get("weight_score", 0) >= 15: parts.append("历史互动表现好") if detail.get("scarcity_score", 0) >= 15: parts.append("内容空白可抢占") if detail.get("timeliness_score", 0) >= 8: parts.append("时效性强") source = item.get("source", "") if source == "hotspot" and not parts: parts.append("热点趋势推荐") elif source == "weight" and not parts: parts.append("基于历史表现推荐") elif source == "trend" and not parts: parts.append("持续趋势主题") if not parts: parts.append("综合推荐") return ",".join(parts) @staticmethod def _generate_angles(topic: str, source: str) -> list[str]: """ 为主题生成 1-3 个创作角度建议 注意: 这里用规则生成,不调用 LLM """ angles = [] # 通用角度模板 templates_by_type = { "穿搭": [ f"从预算角度分享{topic}的平替选择", f"身材不同如何驾驭{topic}", f"一周{topic}不重样的实穿记录", ], "美食": [ f"零失败的{topic}详细做法", f"外卖 vs 自己做{topic}的对比", f"{topic}的隐藏吃法", ], "护肤": [ f"不同肤质的{topic}选择指南", f"踩雷vs回购:{topic}真实体验", f"平价替代大牌{topic}推荐", ], "好物": [ f"用了半年的{topic}真实测评", f"后悔没早买的{topic}清单", f"从使用场景出发推荐{topic}", ], } # 根据主题关键词匹配模板 matched = False for keyword, templates in templates_by_type.items(): if keyword in topic: angles = templates[:3] matched = True break if not matched: # 通用角度 angles = [ f"个人真实体验分享{topic}", f"新手入门{topic}的详细攻略", f"关于{topic}的冷知识和避坑指南", ] # 限制每个角度不超过 30 字 return [a[:30] for a in angles]