feat(analytics): 新增智能学习引擎与笔记表现分析模块

- 新增 `analytics_service.py` 模块,实现笔记数据采集、权重计算与智能分析功能
- 支持定时采集已发布笔记的互动数据(点赞、评论、收藏),并计算主题、风格、标签等多维度权重
- 提供加权随机选题功能,根据历史表现优先生成高互动潜力内容
- 集成 LLM 深度分析,生成内容策略建议与优化报告
- 新增「智能学习」UI 标签页,支持数据采集、权重计算、AI 分析与定时自动学习

♻️ refactor(llm): 重构 LLM 服务以支持多模型智能适配与加权文案生成

- 扩展 `llm_service.py`,新增 `get_sd_prompt_guide()` 方法,根据当前 SD 模型动态生成绘图提示词指南
- 新增 `PROMPT_PERFORMANCE_ANALYSIS` 与 `PROMPT_WEIGHTED_COPYWRITING` 提示词模板,支持笔记表现分析与加权文案生成
- 重构 `generate_copy()`、`generate_copy_with_reference()` 方法,支持 `sd_model_name` 与 `persona` 参数,实现多模型适配与人设融合
- 新增 `analyze_note_performance()` 与 `generate_weighted_copy()` 方法,实现 AI 深度分析与智能加权创作

♻️ refactor(sd): 重构 SD 服务以支持多模型配置系统与智能参数适配

- 重构 `sd_service.py`,引入 `SD_MODEL_PROFILES` 配置体系,支持 `majicmixRealistic`、`Realistic Vision`、`Juggernaut XL` 三款模型
- 新增 `detect_model_profile()`、`get_model_profile()`、`get_model_profile_info()` 方法,实现模型自动识别与档案信息展示
- 重构 `txt2img()` 与 `img2img()` 方法,自动根据当前模型应用最优参数、提示词前缀/后缀与反向提示词
- 更新 `get_sd_preset()` 方法,支持模型专属预设参数加载

🎨 style(config): 更新默认配置与人设池

- 更新 `config.json` 与 `config_manager.py`,将默认模型改为 `gemini-3-flash-preview`,默认人设改为「身材管理健身美女」
- 新增 `use_smart_weights` 配置项,控制是否启用智能加权发布
- 扩展 `PERSONA_POOL_MAP`,新增「身材管理健身美女」人设及其对应主题与关键词库

🔧 chore(main): 集成智能学习引擎并扩展自动发布链路

- 在 `main.py` 中实例化 `AnalyticsService`,并集成至各功能模块
- 扩展 `generate_copy()`、`generate_from_hotspot()`、`auto_publish_once()` 等方法,支持 `sd_model_name`、`persona`、`quality_mode_val` 参数传递
- 实现智能加权发布逻辑:当启用权重且数据可用时,自动选择高权重主题、风格与标签,并使用加权文案模板
- 新增「智能学习」标签页相关 UI 组件与事件处理函数,包括数据采集、权重计算、AI 分析、定时学习与加权主题预览
- 更新 SD 模型选择事件,实时显示模型档案信息卡
- 扩展自动调度器,支持智能权重、人设与画质模式的参数传递

📝 docs(changelog): 更新版本日志记录新功能与改进

- 在 `CHANGELOG.md` 中新增 `[2.1.0]` 与 `[2.2.0]` 版本记录
- 详细描述「智能学习引擎」与「多 SD 模型智能适配」两大核心功能
- 列出相关代码重构、配置更新与文件新增情况
This commit is contained in:
zhoujie 2026-02-10 21:29:57 +08:00
parent 883082411a
commit 156a18ae0c
7 changed files with 1537 additions and 121 deletions

View File

@ -2,6 +2,50 @@
本项目遵循 [Semantic Versioning](https://semver.org/) 语义化版本规范。 本项目遵循 [Semantic Versioning](https://semver.org/) 语义化版本规范。
## [2.2.0] - 2026-02-10
### 🚀 新功能
- **多 SD 模型智能适配**
- 支持 3 款模型档案majicmixRealistic东亚网红风、Realistic Vision纪实摄影风、Juggernaut XL电影大片风
- 自动检测当前模型,匹配提示词前缀/后缀、反向提示词、分辨率、CFG 参数
- LLM 生成 SD 提示词时自动注入模型专属指南(语法、风格、禁忌词)
- UI 选模型实时显示模型档案信息卡(架构、分辨率、风格说明)
- 未知模型自动回退到 Juggernaut XL 默认档案并提示
- **身材管理健身美女人设**
- 新增默认人设20 个主题 + 18 个关键词库
- 覆盖健身打卡、穿搭显瘦、饮食管理、身材对比等高互动方向
### ⚙️ 改进
- `sd_service.py` 重构:`SD_MODEL_PROFILES` 配置体系替代旧硬编码预设
- `llm_service.py`:三套文案 Prompt 支持 `{sd_prompt_guide}` 动态占位符
- `main.py`:所有文案/图片生成链路传递 `sd_model_name` 参数
- 自动运营调度链路完整传递 SD 模型参数
## [2.1.0] - 2026-02-10
### 🚀 新功能
- **智能学习引擎** (新 Tab: 🧠 智能学习)
- 自动采集已发布笔记的互动数据 (点赞、评论、收藏)
- 多维度权重计算:主题权重、风格权重、标签权重、标题模式权重
- AI 深度分析LLM 分析笔记表现规律,生成内容策略建议
- 定时自动学习可配置间隔1-48小时后台自动采集 + 分析
- 可视化报告:权重排行、模式分析、智能建议
- 加权主题预览:实时查看权重最高的主题
- **智能加权发布**
- 自动发布时根据笔记表现权重选择主题(高权重主题优先)
- 智能加权文案生成:融入权重洞察生成高互动潜力内容
- 自动补充高权重标签到发布内容
- 一键开关:可在智能学习 Tab 启用/关闭
### 📁 新文件
- `analytics_service.py` - 笔记数据分析 & 权重学习服务模块
- `xhs_workspace/analytics_data.json` - 笔记表现数据存储
- `xhs_workspace/content_weights.json` - 内容权重数据存储
## [2.0.0] - 2026-02-08 ## [2.0.0] - 2026-02-08
### 🚀 新功能 ### 🚀 新功能

624
analytics_service.py Normal file
View File

@ -0,0 +1,624 @@
"""
笔记数据分析 & 智能权重学习模块
定时抓取已发布笔记的互动数据自动学习哪些内容受欢迎生成加权主题池
"""
import json
import os
import re
import time
import logging
import math
from datetime import datetime, timedelta
from collections import defaultdict
logger = logging.getLogger(__name__)
ANALYTICS_FILE = "analytics_data.json"
WEIGHTS_FILE = "content_weights.json"
def _safe_int(val) -> int:
"""'1.2万' / '1234' / 1234 等格式转为整数"""
if isinstance(val, (int, float)):
return int(val)
if not val:
return 0
s = str(val).strip()
if "" in s:
try:
return int(float(s.replace("", "")) * 10000)
except ValueError:
return 0
try:
return int(float(s))
except ValueError:
return 0
class AnalyticsService:
"""笔记表现分析 & 权重学习引擎"""
def __init__(self, workspace_dir: str = "xhs_workspace"):
self.workspace_dir = workspace_dir
self.analytics_path = os.path.join(workspace_dir, ANALYTICS_FILE)
self.weights_path = os.path.join(workspace_dir, WEIGHTS_FILE)
self._analytics_data = self._load_json(self.analytics_path, {"notes": {}, "last_analysis": ""})
self._weights = self._load_json(self.weights_path, {
"topic_weights": {},
"style_weights": {},
"tag_weights": {},
"title_pattern_weights": {},
"time_weights": {},
"last_updated": "",
"analysis_history": [],
})
# ========== 持久化 ==========
@staticmethod
def _load_json(path: str, default: dict) -> dict:
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning("加载 %s 失败: %s,使用默认值", path, e)
return default.copy()
def _save_analytics(self):
os.makedirs(self.workspace_dir, exist_ok=True)
with open(self.analytics_path, "w", encoding="utf-8") as f:
json.dump(self._analytics_data, f, ensure_ascii=False, indent=2)
def _save_weights(self):
os.makedirs(self.workspace_dir, exist_ok=True)
with open(self.weights_path, "w", encoding="utf-8") as f:
json.dump(self._weights, f, ensure_ascii=False, indent=2)
# ========== 数据采集 ==========
def collect_note_performance(self, mcp_client, user_id: str, xsec_token: str) -> dict:
"""
通过 MCP 获取我的所有笔记及其互动数据存入 analytics_data.json
返回 {"total": N, "updated": M, "notes": [...]}
"""
logger.info("开始采集笔记表现数据 (user_id=%s)", user_id)
raw = mcp_client.get_user_profile(user_id, xsec_token)
text = ""
if isinstance(raw, dict):
content_list = raw.get("content", [])
for item in content_list:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if not text:
text = raw.get("text", "")
# 解析 JSON
data = None
for attempt_fn in [
lambda t: json.loads(t),
lambda t: json.loads(re.search(r'```(?:json)?\s*\n([\s\S]+?)\n```', t).group(1)),
lambda t: json.loads(re.search(r'(\{[\s\S]*\})', t).group(1)),
]:
try:
data = attempt_fn(text)
if data:
break
except Exception:
continue
if not data:
return {"total": 0, "updated": 0, "error": "无法解析用户数据"}
feeds = data.get("feeds", [])
if not feeds:
return {"total": 0, "updated": 0, "error": "未找到笔记数据"}
notes_dict = self._analytics_data.get("notes", {})
updated = 0
note_summaries = []
for f in feeds:
nc = f.get("noteCard") or {}
note_id = nc.get("noteId") or f.get("noteId", "")
if not note_id:
continue
interact = nc.get("interactInfo") or {}
liked = _safe_int(interact.get("likedCount", 0))
# MCP 返回的用户主页笔记列表通常只有 likedCount
# 详情页才有评论数和收藏数,先用点赞数作为主指标
title = nc.get("displayTitle", "") or ""
note_type = nc.get("type", "normal") # normal / video
# 从本地备份的文案中提取主题、风格、标签
local_meta = self._find_local_meta(title)
note_data = {
"note_id": note_id,
"title": title,
"type": note_type,
"likes": liked,
"topic": local_meta.get("topic", ""),
"style": local_meta.get("style", ""),
"tags": local_meta.get("tags", []),
"sd_prompt": local_meta.get("sd_prompt", ""),
"collected_at": datetime.now().isoformat(),
}
# 更新或新增
old = notes_dict.get(note_id, {})
if old.get("likes", 0) != liked or not old:
updated += 1
notes_dict[note_id] = {**old, **note_data}
note_summaries.append(note_data)
self._analytics_data["notes"] = notes_dict
self._analytics_data["last_analysis"] = datetime.now().isoformat()
self._save_analytics()
logger.info("采集完成: 共 %d 篇笔记, 更新 %d", len(feeds), updated)
return {"total": len(feeds), "updated": updated, "notes": note_summaries}
def collect_note_details(self, mcp_client, note_id: str, xsec_token: str):
"""获取单篇笔记的详细数据(点赞、评论数、收藏等)"""
try:
result = mcp_client.get_feed_detail(note_id, xsec_token, load_all_comments=False)
text = ""
if isinstance(result, dict):
for item in result.get("content", []):
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text", "")
break
if text:
data = None
try:
data = json.loads(text)
except Exception:
m = re.search(r'(\{[\s\S]*\})', text)
if m:
try:
data = json.loads(m.group(1))
except Exception:
pass
if data:
interact = data.get("interactInfo") or {}
comments = data.get("comments", [])
return {
"likes": _safe_int(interact.get("likedCount", 0)),
"comments_count": _safe_int(interact.get("commentCount", len(comments))),
"collects": _safe_int(interact.get("collectedCount", 0)),
"shares": _safe_int(interact.get("shareCount", 0)),
}
except Exception as e:
logger.warning("获取笔记 %s 详情失败: %s", note_id, e)
return None
def _find_local_meta(self, title: str) -> dict:
"""从本地 xhs_workspace 中查找匹配标题的备份文案,提取 topic/style/tags"""
result = {"topic": "", "style": "", "tags": [], "sd_prompt": ""}
if not title:
return result
# 搜索备份目录
try:
for dirname in os.listdir(self.workspace_dir):
dir_path = os.path.join(self.workspace_dir, dirname)
if not os.path.isdir(dir_path) or dirname.startswith("_"):
continue
txt_path = os.path.join(dir_path, "文案.txt")
if not os.path.exists(txt_path):
continue
try:
with open(txt_path, "r", encoding="utf-8") as f:
content = f.read()
# 检查标题是否匹配
if title[:10] in content or title in dirname:
# 提取元数据
for line in content.split("\n"):
if line.startswith("风格:"):
result["style"] = line.split(":", 1)[1].strip()
elif line.startswith("主题:"):
result["topic"] = line.split(":", 1)[1].strip()
elif line.startswith("标签:"):
tags_str = line.split(":", 1)[1].strip()
result["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
elif line.startswith("SD Prompt:"):
result["sd_prompt"] = line.split(":", 1)[1].strip()
break
except Exception:
continue
except Exception:
pass
return result
# ========== 权重计算 ==========
def calculate_weights(self) -> dict:
"""
根据已采集的笔记表现数据计算各维度权重
使用 互动得分 = likes * 1.0 + comments * 2.0 + collects * 1.5 加权
返回权重摘要
"""
notes = self._analytics_data.get("notes", {})
if not notes:
return {"error": "暂无笔记数据,请先采集"}
# 计算每篇笔记的综合得分
scored_notes = []
for nid, note in notes.items():
likes = note.get("likes", 0)
comments_count = note.get("comments_count", 0)
collects = note.get("collects", 0)
# 综合得分: 点赞权重 1.0, 评论权重 2.0(评论代表深度互动), 收藏权重 1.5
score = likes * 1.0 + comments_count * 2.0 + collects * 1.5
# 至少用点赞数保底
if score == 0:
score = likes
scored_notes.append({**note, "score": score, "note_id": nid})
if not scored_notes:
return {"error": "没有可分析的笔记"}
# 按得分排序
scored_notes.sort(key=lambda x: x["score"], reverse=True)
max_score = scored_notes[0]["score"] if scored_notes[0]["score"] > 0 else 1
# ---- 主题权重 ----
topic_scores = defaultdict(float)
topic_counts = defaultdict(int)
for note in scored_notes:
topic = note.get("topic", "").strip()
if topic:
topic_scores[topic] += note["score"]
topic_counts[topic] += 1
topic_weights = {}
for topic, total_score in topic_scores.items():
avg_score = total_score / topic_counts[topic]
# 归一化到 0-100
weight = min(100, int((avg_score / max_score) * 100)) if max_score > 0 else 50
# 多篇验证的加分
if topic_counts[topic] >= 3:
weight = min(100, weight + 10)
elif topic_counts[topic] >= 2:
weight = min(100, weight + 5)
topic_weights[topic] = {
"weight": weight,
"count": topic_counts[topic],
"avg_score": round(avg_score, 1),
"total_score": round(total_score, 1),
}
# ---- 风格权重 ----
style_scores = defaultdict(float)
style_counts = defaultdict(int)
for note in scored_notes:
style = note.get("style", "").strip()
if style:
style_scores[style] += note["score"]
style_counts[style] += 1
style_weights = {}
for style, total_score in style_scores.items():
avg = total_score / style_counts[style]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
style_weights[style] = {
"weight": weight,
"count": style_counts[style],
"avg_score": round(avg, 1),
}
# ---- 标签权重 ----
tag_scores = defaultdict(float)
tag_counts = defaultdict(int)
for note in scored_notes:
for tag in note.get("tags", []):
tag = tag.strip().lstrip("#")
if tag:
tag_scores[tag] += note["score"]
tag_counts[tag] += 1
tag_weights = {}
for tag, total_score in tag_scores.items():
avg = total_score / tag_counts[tag]
weight = min(100, int((avg / max_score) * 100)) if max_score > 0 else 50
tag_weights[tag] = {"weight": weight, "count": tag_counts[tag]}
# 排序后取 Top
tag_weights = dict(sorted(tag_weights.items(), key=lambda x: x[1]["weight"], reverse=True)[:30])
# ---- 标题模式权重 (提取 emoji/句式/长度特征) ----
title_patterns = defaultdict(list)
for note in scored_notes:
title = note.get("title", "")
if not title:
continue
# 检测标题特征
has_emoji = bool(re.search(r'[\U0001F600-\U0001F9FF\u2600-\u27BF]', title))
has_question = "" in title or "?" in title
has_exclaim = "" in title or "!" in title
has_ellipsis = "..." in title or "" in title
length_bucket = "短(≤10)" if len(title) <= 10 else ("中(11-15)" if len(title) <= 15 else "长(16-20)")
for feature, val in [
("含emoji", has_emoji), ("疑问句式", has_question),
("感叹句式", has_exclaim), ("省略句式", has_ellipsis),
]:
if val:
title_patterns[feature].append(note["score"])
title_patterns[f"长度:{length_bucket}"].append(note["score"])
title_pattern_weights = {}
for pattern, scores in title_patterns.items():
avg = sum(scores) / len(scores) if scores else 0
title_pattern_weights[pattern] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
"avg_score": round(avg, 1),
}
# ---- 发布时间权重 ----
time_scores = defaultdict(list)
for note in scored_notes:
collected = note.get("collected_at", "")
if collected:
try:
dt = datetime.fromisoformat(collected)
hour_bucket = f"{(dt.hour // 3) * 3:02d}-{(dt.hour // 3) * 3 + 3:02d}"
time_scores[hour_bucket].append(note["score"])
except Exception:
pass
time_weights = {}
for bucket, scores in time_scores.items():
avg = sum(scores) / len(scores) if scores else 0
time_weights[bucket] = {
"weight": min(100, int((avg / max_score) * 100)) if max_score > 0 else 50,
"count": len(scores),
}
# ---- 保存权重 ----
self._weights.update({
"topic_weights": dict(sorted(topic_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"style_weights": dict(sorted(style_weights.items(), key=lambda x: x[1]["weight"], reverse=True)),
"tag_weights": tag_weights,
"title_pattern_weights": title_pattern_weights,
"time_weights": time_weights,
"last_updated": datetime.now().isoformat(),
"total_notes_analyzed": len(scored_notes),
"top_note": {
"title": scored_notes[0].get("title", ""),
"score": scored_notes[0].get("score", 0),
"likes": scored_notes[0].get("likes", 0),
} if scored_notes else {},
})
# 追加分析历史
history = self._weights.get("analysis_history", [])
history.append({
"time": datetime.now().isoformat(),
"total_notes": len(scored_notes),
"avg_score": round(sum(n["score"] for n in scored_notes) / len(scored_notes), 1),
"top_topic": list(topic_weights.keys())[0] if topic_weights else "",
})
# 只保留最近 50 条
self._weights["analysis_history"] = history[-50:]
self._save_weights()
return {
"total_notes": len(scored_notes),
"top_topics": list(topic_weights.items())[:10],
"top_styles": list(style_weights.items())[:5],
"top_tags": list(tag_weights.items())[:10],
"title_patterns": title_pattern_weights,
"top_note": scored_notes[0] if scored_notes else None,
}
# ========== 加权主题选择 ==========
def get_weighted_topic(self, base_topics: list[str] = None) -> str:
"""
根据权重从主题池中加权随机选择一个主题
如果没有权重数据, 退回均匀随机
"""
import random
topic_weights = self._weights.get("topic_weights", {})
if not topic_weights:
# 无权重数据,从基础池中随机
return random.choice(base_topics) if base_topics else "日常分享"
# 合并: 已有权重的主题 + base_topics 中新的主题
all_topics = {}
for topic, info in topic_weights.items():
all_topics[topic] = info.get("weight", 50)
if base_topics:
for t in base_topics:
if t not in all_topics:
all_topics[t] = 30 # 新主题给一个基础权重
# 加权随机选择
topics = list(all_topics.keys())
weights = [max(1, all_topics[t]) for t in topics] # 确保权重 >= 1
chosen = random.choices(topics, weights=weights, k=1)[0]
logger.info("加权选题: %s (权重: %s)", chosen, all_topics.get(chosen, "?"))
return chosen
def get_weighted_style(self, base_styles: list[str] = None) -> str:
"""根据权重选择风格"""
import random
style_weights = self._weights.get("style_weights", {})
if not style_weights:
return random.choice(base_styles) if base_styles else "真实分享"
all_styles = {}
for style, info in style_weights.items():
all_styles[style] = info.get("weight", 50)
if base_styles:
for s in base_styles:
if s not in all_styles:
all_styles[s] = 30
styles = list(all_styles.keys())
weights = [max(1, all_styles[s]) for s in styles]
return random.choices(styles, weights=weights, k=1)[0]
def get_top_tags(self, n: int = 8) -> list[str]:
"""获取权重最高的 N 个标签"""
tag_weights = self._weights.get("tag_weights", {})
if not tag_weights:
return []
sorted_tags = sorted(tag_weights.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return [t[0] for t in sorted_tags[:n]]
def get_title_advice(self) -> str:
"""根据标题模式权重生成建议"""
patterns = self._weights.get("title_pattern_weights", {})
if not patterns:
return "暂无标题分析数据"
sorted_p = sorted(patterns.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
advice_parts = []
for p_name, p_info in sorted_p[:5]:
advice_parts.append(f"{p_name}: 权重 {p_info['weight']}分 (出现{p_info['count']}次)")
return "\n".join(advice_parts)
# ========== LLM 深度分析 ==========
def generate_llm_analysis_prompt(self) -> str:
"""生成给 LLM 分析笔记表现的 prompt 数据部分"""
notes = self._analytics_data.get("notes", {})
if not notes:
return ""
# 按点赞排序
sorted_notes = sorted(notes.values(), key=lambda x: x.get("likes", 0), reverse=True)
lines = []
for i, note in enumerate(sorted_notes[:20]):
lines.append(
f"#{i+1}{note.get('title', '无标题')}\n"
f" 点赞: {note.get('likes', 0)} | 主题: {note.get('topic', '未知')} | "
f"风格: {note.get('style', '未知')}\n"
f" 标签: {', '.join(note.get('tags', []))}"
)
return "\n".join(lines)
# ========== 报告生成 ==========
def generate_report(self) -> str:
"""生成 Markdown 格式的分析报告"""
weights = self._weights
notes = self._analytics_data.get("notes", {})
if not notes:
return "## 📊 暂无分析数据\n\n请先点击「采集数据」获取笔记表现数据,再点击「计算权重」。"
total = len(notes)
last_updated = weights.get("last_updated", "未知")
# Top Note
top_note = weights.get("top_note", {})
top_note_str = f"**{top_note.get('title', '')}** (❤️ {top_note.get('likes', 0)})" if top_note else "暂无"
lines = [
f"## 📊 智能内容学习报告",
f"",
f"🕐 最后更新: {last_updated[:19] if last_updated else '从未'}",
f"📝 分析笔记数: **{total}** 篇",
f"🏆 最佳笔记: {top_note_str}",
"",
"---",
"",
]
# 主题权重
topic_w = weights.get("topic_weights", {})
if topic_w:
lines.append("### 🎯 主题权重排行")
lines.append("| 排名 | 主题 | 权重 | 笔记数 | 平均得分 |")
lines.append("|:---:|------|:---:|:---:|:---:|")
for idx, (topic, info) in enumerate(list(topic_w.items())[:10]):
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(
f"| {idx+1} | {topic} | {bar} {info['weight']} | {info['count']} | {info['avg_score']} |"
)
lines.append("")
# 风格权重
style_w = weights.get("style_weights", {})
if style_w:
lines.append("### 🎨 风格权重排行")
for style, info in list(style_w.items())[:5]:
bar = "" * (info["weight"] // 10) + "" * (10 - info["weight"] // 10)
lines.append(f"- **{style}**: {bar} {info['weight']}分 ({info['count']}篇)")
lines.append("")
# 标签权重
tag_w = weights.get("tag_weights", {})
if tag_w:
lines.append("### 🏷️ 高权重标签 (Top 10)")
top_tags = list(tag_w.items())[:10]
tag_strs = [f"`#{t}` ({info['weight']})" for t, info in top_tags]
lines.append(" | ".join(tag_strs))
lines.append("")
# 标题模式
title_p = weights.get("title_pattern_weights", {})
if title_p:
lines.append("### ✏️ 标题模式分析")
sorted_p = sorted(title_p.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
for p_name, p_info in sorted_p[:6]:
lines.append(f"- **{p_name}**: 权重 {p_info['weight']} (出现 {p_info['count']} 次)")
lines.append("")
# 建议
lines.append("---")
lines.append("### 💡 智能建议")
if topic_w:
top_3 = list(topic_w.keys())[:3]
lines.append(f"- 📌 **高权重主题**: 优先创作 → {', '.join(top_3)}")
if tag_w:
hot_tags = [f"#{t}" for t in list(tag_w.keys())[:5]]
lines.append(f"- 🏷️ **推荐标签**: {' '.join(hot_tags)}")
if title_p:
best_pattern = max(title_p.items(), key=lambda x: x[1].get("weight", 0))
lines.append(f"- ✏️ **标题建议**: 多用「{best_pattern[0]}」(权重{best_pattern[1]['weight']})")
lines.append("")
lines.append(f"> 💡 启用「智能加权发布」后,自动发布将按权重倾斜生成高表现内容")
return "\n".join(lines)
def get_weighted_topics_display(self) -> str:
"""获取加权后的主题列表(用于UI显示)"""
topic_w = self._weights.get("topic_weights", {})
if not topic_w:
return ""
# 按权重排序,返回逗号分隔
sorted_topics = sorted(topic_w.items(), key=lambda x: x[1].get("weight", 0), reverse=True)
return ", ".join([t[0] for t in sorted_topics[:15]])
@property
def has_weights(self) -> bool:
"""是否已有权重数据"""
return bool(self._weights.get("topic_weights"))
@property
def weights_summary(self) -> str:
"""一行权重摘要"""
tw = self._weights.get("topic_weights", {})
total = self._weights.get("total_notes_analyzed", 0)
if not tw:
return "暂无权重数据"
top = list(tw.keys())[:3]
return f"{total}篇笔记 | 热门: {', '.join(top)}"

View File

@ -3,7 +3,7 @@
"base_url": "https://wolfai.top/v1", "base_url": "https://wolfai.top/v1",
"sd_url": "http://127.0.0.1:7861", "sd_url": "http://127.0.0.1:7861",
"mcp_url": "http://localhost:18060/mcp", "mcp_url": "http://localhost:18060/mcp",
"model": "deepseek-v3", "model": "gemini-3-flash-preview",
"persona": "温柔知性的时尚博主", "persona": "温柔知性的时尚博主",
"auto_reply_enabled": false, "auto_reply_enabled": false,
"schedule_enabled": false, "schedule_enabled": false,

View File

@ -17,12 +17,13 @@ DEFAULT_CONFIG = {
"sd_url": "http://127.0.0.1:7860", "sd_url": "http://127.0.0.1:7860",
"mcp_url": "http://localhost:18060/mcp", "mcp_url": "http://localhost:18060/mcp",
"model": "gpt-3.5-turbo", "model": "gpt-3.5-turbo",
"persona": "温柔知性的时尚博主", "persona": "身材管理健身美女,热爱分享好身材秘诀和穿搭显身材技巧",
"auto_reply_enabled": False, "auto_reply_enabled": False,
"schedule_enabled": False, "schedule_enabled": False,
"my_user_id": "", "my_user_id": "",
"active_llm": "", "active_llm": "",
"llm_providers": [], "llm_providers": [],
"use_smart_weights": True,
} }

View File

@ -45,18 +45,68 @@ PROMPT_COPYWRITING = """
8. 结尾加 5-8 个相关话题标签(#) 8. 结尾加 5-8 个相关话题标签(#)
绘图 Prompt 绘图 Prompt
生成 Stable Diffusion 英文提示词适配 JuggernautXL 模型 {sd_prompt_guide}
- 人物要求最重要如果画面中有人物必须是东亚面孔的中国人使用 asian girl/boy, chinese, east asian features, black hair, dark brown eyes, delicate facial features, fair skin, slim figure 等描述绝对禁止出现西方人/欧美人特征
- 质量词masterpiece, best quality, ultra detailed, 8k uhd, high resolution
- 光影natural lighting, soft shadows, studio lighting, golden hour 根据场景选择
- 风格photorealistic, cinematic, editorial photography, ins style, chinese social media aesthetic
- 构图dynamic angle, depth of field, bokeh
- 细节detailed skin texture, sharp focus, vivid colors
- 审美偏向整体画面风格偏向东方审美清新淡雅小红书风格
不要使用括号权重语法直接用英文逗号分隔描述
返回 JSON 格式 返回 JSON 格式
{"title": "...", "content": "...", "sd_prompt": "...", "tags": ["标签1", "标签2", ...]} {{"title": "...", "content": "...", "sd_prompt": "...", "tags": ["标签1", "标签2", ...]}}
"""
PROMPT_PERFORMANCE_ANALYSIS = """
你是一个有实战经验的小红书运营数据分析师下面是一个博主已发布的笔记数据按互动量从高到低排列
{note_data}
权重学习分析任务
请深度分析这些笔记的互动数据找出什么样的内容最受欢迎的规律
请分析以下维度
1. **高表现内容特征**表现好的笔记有什么共同特征主题标题套路风格标签越具体越好
2. **低表现内容反思**表现差的笔记问题出在哪是选题不行标题没吸引力还是其他原因
3. **用户偏好画像**从数据反推关注这个账号的用户最喜欢什么样的内容
4. **内容优化建议**给出 5 个具体的下一步内容方向每个都要说清楚为什么推荐
5. **标题优化建议**总结 3 个高互动标题的写法模板直接给出可套用的句式
6. **最佳实践标签**推荐 10 个最有流量潜力的标签组合
注意
- 用数据说话不要空谈
- 建议要具体到可以直接执行的程度
- 不要说废话和套话
返回 JSON 格式
{{"high_perform_features": "...", "low_perform_issues": "...", "user_preference": "...", "content_suggestions": [{{"topic": "...", "reason": "...", "priority": 1-5}}], "title_templates": ["模板1", "模板2", "模板3"], "recommended_tags": ["标签1", "标签2", ...]}}
"""
PROMPT_WEIGHTED_COPYWRITING = """
你是一个真实的小红书博主正在用手机编辑一篇笔记
智能学习洞察基于你过去笔记的数据分析
{weight_insights}
创作要求
基于以上数据洞察请创作一篇更容易获得高互动的笔记要把数据分析的结论融入创作中但写出来的内容要自然不能看出是"为了数据而写"
标题规则(严格执行)
1. 长度限制必须控制在 18 字以内含Emoji绝对不能超过 20
2. 参考高互动标题的模式{title_advice}
3. 口语化有情绪感像发朋友圈
4. 禁止广告法违禁词
正文规则像说话一样写
1. 想象你在跟闺蜜/朋友面对面聊天
2. 正文控制在 400-600
3. 自然展开不要分点罗列
4. 可以有小情绪吐槽感叹自嘲开心炸裂
5. emoji穿插在情绪高点不要每句都有
6. 绝对禁止 AI 痕迹书面用语
推荐标签优先使用这些高权重标签 {hot_tags}
绘图 Prompt
{sd_prompt_guide}
返回 JSON 格式
{{"title": "...", "content": "...", "sd_prompt": "...", "tags": ["标签1", "标签2", ...]}}
""" """
PROMPT_HOTSPOT_ANALYSIS = """ PROMPT_HOTSPOT_ANALYSIS = """
@ -208,13 +258,7 @@ PROMPT_COPY_WITH_REFERENCE = """
6. 结尾加 5-8 个话题标签(#) 6. 结尾加 5-8 个话题标签(#)
绘图 Prompt 绘图 Prompt
生成 Stable Diffusion 英文提示词适配 JuggernautXL 模型 {sd_prompt_guide}
- 人物要求最重要如果画面中有人物必须是东亚面孔的中国人使用 asian girl/boy, chinese, east asian features, black hair, dark brown eyes, delicate facial features, fair skin, slim figure 等描述绝对禁止出现西方人/欧美人特征
- 必含质量词masterpiece, best quality, ultra detailed, 8k uhd
- 风格photorealistic, cinematic, editorial photography, chinese social media aesthetic
- 光影和细节natural lighting, sharp focus, vivid colors, detailed skin texture
- 审美偏向整体画面风格偏向东方审美清新淡雅小红书风格
- 用英文逗号分隔不用括号权重语法
返回 JSON 格式 返回 JSON 格式
{{"title": "...", "content": "...", "sd_prompt": "...", "tags": ["标签1", "标签2", ...]}} {{"title": "...", "content": "...", "sd_prompt": "...", "tags": ["标签1", "标签2", ...]}}
@ -232,6 +276,56 @@ class LLMService:
self.base_url = base_url.rstrip("/") self.base_url = base_url.rstrip("/")
self.model = model self.model = model
@staticmethod
def get_sd_prompt_guide(sd_model_name: str = None) -> str:
"""根据当前 SD 模型生成 LLM 使用的绘图 Prompt 指南"""
from sd_service import SD_MODEL_PROFILES, detect_model_profile
key = detect_model_profile(sd_model_name) if sd_model_name else "juggernautXL"
profile = SD_MODEL_PROFILES.get(key, SD_MODEL_PROFILES["juggernautXL"])
arch = profile.get("arch", "sdxl")
display = profile.get("display_name", key)
desc = profile.get("description", "")
if key == "majicmixRealistic":
return (
f"生成 Stable Diffusion 英文提示词,当前使用模型: {display} ({desc})\n"
"该模型擅长东亚网红/朋友圈自拍风格,请按以下规则生成 sd_prompt\n"
"- 人物要求(最重要!):必须是东亚面孔中国人\n"
"- 推荐使用 (权重:数值) 语法加强关键词,例如 (asian girl:1.3), (best quality:1.4)\n"
"- 风格关键词RAW photo, realistic, photorealistic, natural makeup, instagram aesthetic\n"
"- 氛围词soft lighting, warm tone, natural skin texture, phone camera feel\n"
"- 非常适合:自拍、穿搭展示、美妆效果、生活日常、闺蜜合照风格\n"
"- 画面要有「朋友圈精选照片」的感觉,自然不做作\n"
"- 用英文逗号分隔"
)
elif key == "realisticVision":
return (
f"生成 Stable Diffusion 英文提示词,当前使用模型: {display} ({desc})\n"
"该模型擅长写实纪实摄影风格,请按以下规则生成 sd_prompt\n"
"- 人物要求(最重要!):必须是东亚面孔中国人\n"
"- 推荐使用 (权重:数值) 语法,例如 (realistic:1.4), (photorealistic:1.4)\n"
"- 风格关键词RAW photo, DSLR, documentary style, street photography, film color grading\n"
"- 质感词skin pores, detailed skin texture, natural imperfections, real lighting\n"
"- 镜头感shot on Canon/Sony, 85mm lens, f/1.8, depth of field\n"
"- 非常适合:街拍、纪实风、旅行照、真实场景、有故事感的画面\n"
"- 画面要有「专业摄影师抓拍」的质感,保留真实皮肤纹理\n"
"- 用英文逗号分隔"
)
else: # juggernautXL (SDXL)
return (
f"生成 Stable Diffusion 英文提示词,当前使用模型: {display} ({desc})\n"
"该模型为 SDXL 架构,擅长电影级大片质感,请按以下规则生成 sd_prompt\n"
"- 人物要求(最重要!):必须是东亚面孔中国人,绝对禁止西方人特征\n"
"- 不要使用 (权重:数值) 括号语法SDXL 模型直接用逗号分隔即可\n"
"- 质量词masterpiece, best quality, ultra detailed, 8k uhd, high resolution\n"
"- 风格photorealistic, cinematic lighting, cinematic composition, commercial photography\n"
"- 光影volumetric lighting, ray tracing, golden hour, studio lighting\n"
"- 非常适合:商业摄影、时尚大片、复杂光影场景、杂志封面风格\n"
"- 画面要有「电影画面/杂志大片」的高级感\n"
"- 用英文逗号分隔"
)
def _chat(self, system_prompt: str, user_message: str, def _chat(self, system_prompt: str, user_message: str,
json_mode: bool = True, temperature: float = 0.8) -> str: json_mode: bool = True, temperature: float = 0.8) -> str:
"""底层聊天接口含空返回检测、json_mode 回退、模型降级)""" """底层聊天接口含空返回检测、json_mode 回退、模型降级)"""
@ -405,16 +499,21 @@ class LLMService:
logger.warning("获取模型列表失败 (%s): %s", url, e) logger.warning("获取模型列表失败 (%s): %s", url, e)
return [] return []
def generate_copy(self, topic: str, style: str) -> dict: def generate_copy(self, topic: str, style: str, sd_model_name: str = None, persona: str = None) -> dict:
"""生成小红书文案(含重试逻辑)""" """生成小红书文案含重试逻辑自动适配SD模型支持人设"""
sd_guide = self.get_sd_prompt_guide(sd_model_name)
system_prompt = PROMPT_COPYWRITING.format(sd_prompt_guide=sd_guide)
user_msg = f"主题:{topic}\n风格:{style}"
if persona:
user_msg = f"【博主人设】:{persona}\n请以此人设的视角和风格创作。\n\n{user_msg}"
last_error = None last_error = None
for attempt in range(2): for attempt in range(2):
try: try:
# 第二次尝试不使用 json_mode兼容不支持的模型 # 第二次尝试不使用 json_mode兼容不支持的模型
use_json_mode = (attempt == 0) use_json_mode = (attempt == 0)
content = self._chat( content = self._chat(
PROMPT_COPYWRITING, system_prompt,
f"主题:{topic}\n风格:{style}", user_msg,
json_mode=use_json_mode, json_mode=use_json_mode,
temperature=0.92, temperature=0.92,
) )
@ -443,17 +542,22 @@ class LLMService:
raise RuntimeError(f"文案生成失败: LLM 返回无法解析为 JSON已重试 2 次。\n最后错误: {last_error}") raise RuntimeError(f"文案生成失败: LLM 返回无法解析为 JSON已重试 2 次。\n最后错误: {last_error}")
def generate_copy_with_reference(self, topic: str, style: str, def generate_copy_with_reference(self, topic: str, style: str,
reference_notes: str) -> dict: reference_notes: str, sd_model_name: str = None, persona: str = None) -> dict:
"""参考热门笔记生成文案(含重试逻辑)""" """参考热门笔记生成文案含重试逻辑自动适配SD模型支持人设"""
sd_guide = self.get_sd_prompt_guide(sd_model_name)
prompt = PROMPT_COPY_WITH_REFERENCE.format( prompt = PROMPT_COPY_WITH_REFERENCE.format(
reference_notes=reference_notes, topic=topic, style=style reference_notes=reference_notes, topic=topic, style=style,
sd_prompt_guide=sd_guide,
) )
user_msg = f"请创作关于「{topic}」的小红书笔记"
if persona:
user_msg = f"【博主人设】:{persona}\n请以此人设的视角和风格创作。\n\n{user_msg}"
last_error = None last_error = None
for attempt in range(2): for attempt in range(2):
try: try:
use_json_mode = (attempt == 0) use_json_mode = (attempt == 0)
content = self._chat( content = self._chat(
prompt, f"请创作关于「{topic}」的小红书笔记", prompt, user_msg,
json_mode=use_json_mode, temperature=0.92, json_mode=use_json_mode, temperature=0.92,
) )
data = self._parse_json(content) data = self._parse_json(content)
@ -572,3 +676,60 @@ class LLMService:
) )
raw = self._chat(prompt, "请生成评论", json_mode=False, temperature=0.95) raw = self._chat(prompt, "请生成评论", json_mode=False, temperature=0.95)
return self._humanize(raw) return self._humanize(raw)
def analyze_note_performance(self, note_data: str) -> dict:
"""AI 深度分析笔记表现,生成内容策略建议"""
prompt = PROMPT_PERFORMANCE_ANALYSIS.format(note_data=note_data)
last_error = None
for attempt in range(2):
try:
use_json_mode = (attempt == 0)
content = self._chat(prompt, "请深度分析以上笔记数据,找出规律并给出优化建议",
json_mode=use_json_mode, temperature=0.7)
return self._parse_json(content)
except (json.JSONDecodeError, ValueError) as e:
last_error = e
if attempt == 0:
logger.warning("表现分析 JSON 解析失败 (尝试 %d/2): %s", attempt + 1, e)
continue
raise RuntimeError(f"笔记表现分析失败: {last_error}")
def generate_weighted_copy(self, topic: str, style: str,
weight_insights: str, title_advice: str,
hot_tags: str, sd_model_name: str = None, persona: str = None) -> dict:
"""基于权重学习生成高互动潜力的文案自动适配SD模型支持人设"""
sd_guide = self.get_sd_prompt_guide(sd_model_name)
prompt = PROMPT_WEIGHTED_COPYWRITING.format(
weight_insights=weight_insights,
title_advice=title_advice,
hot_tags=hot_tags,
sd_prompt_guide=sd_guide,
)
user_msg = f"主题:{topic}\n风格:{style}\n请创作一篇基于数据洞察的高质量小红书笔记"
if persona:
user_msg = f"【博主人设】:{persona}\n请以此人设的视角和风格创作。\n\n{user_msg}"
last_error = None
for attempt in range(2):
try:
use_json_mode = (attempt == 0)
content = self._chat(
prompt,
user_msg,
json_mode=use_json_mode,
temperature=0.92,
)
data = self._parse_json(content)
title = data.get("title", "")
if len(title) > 20:
data["title"] = title[:20]
if "content" in data:
data["content"] = self._humanize_content(data["content"])
return data
except (json.JSONDecodeError, ValueError) as e:
last_error = e
if attempt == 0:
logger.warning("加权文案生成失败 (尝试 %d/2): %s", attempt + 1, e)
continue
raise RuntimeError(f"加权文案生成失败: {last_error}")

433
main.py
View File

@ -19,8 +19,9 @@ import matplotlib.pyplot as plt
from config_manager import ConfigManager, OUTPUT_DIR from config_manager import ConfigManager, OUTPUT_DIR
from llm_service import LLMService from llm_service import LLMService
from sd_service import SDService, DEFAULT_NEGATIVE, FACE_IMAGE_PATH, SD_PRESET_NAMES, get_sd_preset from sd_service import SDService, DEFAULT_NEGATIVE, FACE_IMAGE_PATH, SD_PRESET_NAMES, get_sd_preset, get_model_profile, get_model_profile_info, detect_model_profile, SD_MODEL_PROFILES
from mcp_client import MCPClient, get_mcp_client from mcp_client import MCPClient, get_mcp_client
from analytics_service import AnalyticsService
# ================= matplotlib 中文字体配置 ================= # ================= matplotlib 中文字体配置 =================
_font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"] _font_candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "WenQuanYi Micro Hei"]
@ -54,6 +55,7 @@ cfg = ConfigManager()
cfg.ensure_workspace() cfg.ensure_workspace()
mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp")) mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp"))
analytics = AnalyticsService(OUTPUT_DIR)
# ================================================== # ==================================================
# LLM 多提供商管理 # LLM 多提供商管理
@ -149,11 +151,20 @@ def connect_sd(sd_url):
if ok: if ok:
models = svc.get_models() models = svc.get_models()
cfg.set("sd_url", sd_url) cfg.set("sd_url", sd_url)
return gr.update(choices=models, value=models[0] if models else None), f"{msg}" first = models[0] if models else None
return gr.update(choices=[]), f"{msg}" info = get_model_profile_info(first) if first else "未检测到模型"
return gr.update(choices=models, value=first), f"{msg}", info
return gr.update(choices=[]), f"{msg}", ""
except Exception as e: except Exception as e:
logger.error("SD 连接失败: %s", e) logger.error("SD 连接失败: %s", e)
return gr.update(choices=[]), f"❌ SD 连接失败: {e}" return gr.update(choices=[]), f"❌ SD 连接失败: {e}", ""
def on_sd_model_change(model_name):
"""SD 模型切换时显示模型档案信息"""
if not model_name:
return "未选择模型"
return get_model_profile_info(model_name)
def check_mcp_status(mcp_url): def check_mcp_status(mcp_url):
@ -287,14 +298,15 @@ def load_saved_face_image():
return None, " 尚未设置头像" return None, " 尚未设置头像"
def generate_copy(model, topic, style): def generate_copy(model, topic, style, sd_model_name, persona_text):
"""生成文案""" """生成文案(自动适配 SD 模型的 prompt 风格,支持人设)"""
api_key, base_url, _ = _get_llm_config() api_key, base_url, _ = _get_llm_config()
if not api_key: if not api_key:
return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" return "", "", "", "", "❌ 请先配置并连接 LLM 提供商"
try: try:
svc = LLMService(api_key, base_url, model) svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style) persona = _resolve_persona(persona_text) if persona_text else None
data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona)
cfg.set("model", model) cfg.set("model", model)
tags = data.get("tags", []) tags = data.get("tags", [])
return ( return (
@ -499,8 +511,8 @@ def analyze_and_suggest(model, keyword, search_result):
return f"❌ 分析失败: {e}", "", "" return f"❌ 分析失败: {e}", "", ""
def generate_from_hotspot(model, topic_from_hotspot, style, search_result): def generate_from_hotspot(model, topic_from_hotspot, style, search_result, sd_model_name, persona_text):
"""基于热点分析生成文案""" """基于热点分析生成文案(自动适配 SD 模型,支持人设)"""
if not topic_from_hotspot: if not topic_from_hotspot:
return "", "", "", "", "❌ 请先选择或输入选题" return "", "", "", "", "❌ 请先选择或输入选题"
api_key, base_url, _ = _get_llm_config() api_key, base_url, _ = _get_llm_config()
@ -508,10 +520,13 @@ def generate_from_hotspot(model, topic_from_hotspot, style, search_result):
return "", "", "", "", "❌ 请先配置 LLM 提供商" return "", "", "", "", "❌ 请先配置 LLM 提供商"
try: try:
svc = LLMService(api_key, base_url, model) svc = LLMService(api_key, base_url, model)
persona = _resolve_persona(persona_text) if persona_text else None
data = svc.generate_copy_with_reference( data = svc.generate_copy_with_reference(
topic=topic_from_hotspot, topic=topic_from_hotspot,
style=style, style=style,
reference_notes=search_result[:2000], # 截断防止超长 reference_notes=search_result[:2000],
sd_model_name=sd_model_name,
persona=persona,
) )
tags = data.get("tags", []) tags = data.get("tags", [])
return ( return (
@ -1051,6 +1066,7 @@ def _get_stats_summary() -> str:
# ================= 人设池 ================= # ================= 人设池 =================
DEFAULT_PERSONAS = [ DEFAULT_PERSONAS = [
"身材管理健身美女,热爱分享好身材秘诀和穿搭显身材技巧",
"温柔知性的时尚博主,喜欢分享日常穿搭和生活美学", "温柔知性的时尚博主,喜欢分享日常穿搭和生活美学",
"元气满满的大学生,热爱探店和平价好物分享", "元气满满的大学生,热爱探店和平价好物分享",
"30岁都市白领丽人专注通勤穿搭和职场干货", "30岁都市白领丽人专注通勤穿搭和职场干货",
@ -1084,6 +1100,20 @@ RANDOM_PERSONA_LABEL = "🎲 随机人设(每次自动切换)"
# 每个人设对应一组相符的评论关键词和主题,切换人设时自动同步 # 每个人设对应一组相符的评论关键词和主题,切换人设时自动同步
PERSONA_POOL_MAP = { PERSONA_POOL_MAP = {
# ---- 身材管理类 ----
"身材管理健身美女": {
"topics": [
"好身材穿搭", "显身材穿搭", "马甲线养成", "翘臀训练", "直角肩养成",
"天鹅颈锻炼", "小蛮腰秘诀", "腿型矫正", "体态管理", "维密身材",
"居家塑形", "健身穿搭", "运动内衣测评", "蜜桃臀训练", "锁骨养成",
"紧身穿搭", "比基尼身材", "纤腰丰臀", "身材对比照", "自律打卡",
],
"keywords": [
"身材", "好身材", "马甲线", "翘臀", "直角肩", "天鹅颈",
"小蛮腰", "健身女孩", "塑形", "体态", "蜜桃臀", "腰臀比",
"紧身", "显身材", "维密", "锁骨", "A4腰", "漫画腿",
],
},
# ---- 时尚穿搭类 ---- # ---- 时尚穿搭类 ----
"温柔知性的时尚博主": { "温柔知性的时尚博主": {
"topics": [ "topics": [
@ -1777,9 +1807,11 @@ def auto_favorite_once(keywords_str, fav_count, mcp_url):
return f"❌ 收藏失败: {e}" return f"❌ 收藏失败: {e}"
def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model, face_swap_on=False): def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val, face_swap_on):
"""一键发布 + 同步刷新日志""" """一键发布 + 同步刷新日志"""
msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, face_swap_on=face_swap_on) msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model,
persona_text=persona_text, quality_mode_val=quality_mode_val,
face_swap_on=face_swap_on)
return msg, get_auto_log() return msg, get_auto_log()
@ -1950,8 +1982,8 @@ def auto_reply_once(max_replies, mcp_url, model, persona_text):
return f"❌ 自动回复失败: {e}" return f"❌ 自动回复失败: {e}"
def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, face_swap_on=False): def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text=None, quality_mode_val=None, face_swap_on=False):
"""一键发布:自动生成文案 → 生成图片 → 本地备份 → 发布到小红书(含限额""" """一键发布:自动生成文案 → 生成图片 → 本地备份 → 发布到小红书(含限额 + 智能权重 + 人设 + 画质"""
try: try:
if _is_in_cooldown(): if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试" return "⏳ 错误冷却中,请稍后再试"
@ -1959,6 +1991,14 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, fac
return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})" return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})"
topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS
use_weights = cfg.get("use_smart_weights", True) and analytics.has_weights
if use_weights:
# 智能加权选题
topic = analytics.get_weighted_topic(topics)
style = analytics.get_weighted_style(DEFAULT_STYLES)
_auto_log_append(f"🧠 [智能] 主题: {topic} | 风格: {style} (加权选择)")
else:
topic = random.choice(topics) topic = random.choice(topics)
style = random.choice(DEFAULT_STYLES) style = random.choice(DEFAULT_STYLES)
_auto_log_append(f"📝 主题: {topic} | 风格: {style} (主题池: {len(topics)} 个)") _auto_log_append(f"📝 主题: {topic} | 风格: {style} (主题池: {len(topics)} 个)")
@ -1969,12 +2009,40 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, fac
return "❌ LLM 未配置,请先在全局设置中配置提供商" return "❌ LLM 未配置,请先在全局设置中配置提供商"
svc = LLMService(api_key, base_url, model) svc = LLMService(api_key, base_url, model)
data = svc.generate_copy(topic, style) # 解析人设(随机/指定)
persona = _resolve_persona(persona_text) if persona_text else None
if persona:
_auto_log_append(f"🎭 人设: {persona[:20]}...")
if use_weights:
# 使用加权文案生成 (携带权重洞察)
weight_insights = f"高权重主题: {', '.join(list(analytics._weights.get('topic_weights', {}).keys())[:5])}\n"
weight_insights += f"权重摘要: {analytics.weights_summary}"
title_advice = analytics.get_title_advice()
hot_tags = ", ".join(analytics.get_top_tags(8))
try:
data = svc.generate_weighted_copy(topic, style, weight_insights, title_advice, hot_tags, sd_model_name=sd_model_name, persona=persona)
_auto_log_append("🧠 使用智能加权文案模板")
except Exception as e:
logger.warning("加权文案生成失败, 退回普通模式: %s", e)
data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona)
_auto_log_append("⚠️ 加权模板异常, 使用普通模板")
else:
data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona)
title = (data.get("title", "") or "")[:20] title = (data.get("title", "") or "")[:20]
content = data.get("content", "") content = data.get("content", "")
sd_prompt = data.get("sd_prompt", "") sd_prompt = data.get("sd_prompt", "")
tags = data.get("tags", []) tags = data.get("tags", [])
# 如果有高权重标签,补充到 tags 中
if use_weights:
top_tags = analytics.get_top_tags(5)
for t in top_tags:
if t not in tags:
tags.append(t)
tags = tags[:10] # 限制最多10个标签
if not title: if not title:
_record_error() _record_error()
return "❌ 文案生成失败:无标题" return "❌ 文案生成失败:无标题"
@ -1995,7 +2063,7 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, fac
_auto_log_append("⚠️ 换脸已启用但未找到头像,跳过换脸") _auto_log_append("⚠️ 换脸已启用但未找到头像,跳过换脸")
images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name, images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name,
face_image=face_image, face_image=face_image,
quality_mode="快速 (约30秒)") quality_mode=quality_mode_val or "快速 (约30秒)")
if not images: if not images:
_record_error() _record_error()
return "❌ 图片生成失败:没有返回图片" return "❌ 图片生成失败:没有返回图片"
@ -2068,7 +2136,7 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
fav_min, fav_max, fav_count_per_run, fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour, op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name, keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text, face_swap_on=False): model, persona_text, quality_mode_val=None, face_swap_on=False):
"""后台定时调度循环(含运营时段、冷却、收藏、统计)""" """后台定时调度循环(含运营时段、冷却、收藏、统计)"""
_auto_log_append("🤖 自动化调度器已启动") _auto_log_append("🤖 自动化调度器已启动")
_auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00") _auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00")
@ -2162,7 +2230,9 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
if publish_enabled and now >= next_publish: if publish_enabled and now >= next_publish:
try: try:
_auto_log_append("--- 🔄 执行自动发布 ---") _auto_log_append("--- 🔄 执行自动发布 ---")
msg = auto_publish_once(topics, mcp_url, sd_url_val, sd_model_name, model, face_swap_on=face_swap_on) msg = auto_publish_once(topics, mcp_url, sd_url_val, sd_model_name, model,
persona_text=persona_text, quality_mode_val=quality_mode_val,
face_swap_on=face_swap_on)
_auto_log_append(msg) _auto_log_append(msg)
except Exception as e: except Exception as e:
_auto_log_append(f"❌ 自动发布异常: {e}") _auto_log_append(f"❌ 自动发布异常: {e}")
@ -2201,7 +2271,7 @@ def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on,
fav_min, fav_max, fav_count_per_run, fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour, op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name, keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text, face_swap_on=False): model, persona_text, quality_mode_val, face_swap_on):
"""启动定时自动化""" """启动定时自动化"""
global _auto_thread global _auto_thread
if _auto_running.is_set(): if _auto_running.is_set():
@ -2227,7 +2297,7 @@ def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on,
op_start_hour, op_end_hour, op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name, keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text), model, persona_text),
kwargs={"face_swap_on": face_swap_on}, kwargs={"quality_mode_val": quality_mode_val, "face_swap_on": face_swap_on},
daemon=True, daemon=True,
) )
_auto_thread.start() _auto_thread.start()
@ -2284,6 +2354,183 @@ def get_scheduler_status():
return "⚪ **调度器未运行**" return "⚪ **调度器未运行**"
# ==================================================
# 智能学习 & 笔记分析模块
# ==================================================
# 定时学习状态
_learn_running = threading.Event()
_learn_thread: threading.Thread | None = None
def analytics_collect_data(mcp_url, user_id, xsec_token):
"""采集笔记表现数据"""
if not user_id or not xsec_token:
return "❌ 请先填写用户 ID 和 xsec_token (在「账号登录」Tab 获取)"
try:
client = get_mcp_client(mcp_url)
result = analytics.collect_note_performance(client, user_id, xsec_token)
if "error" in result:
return f"❌ 数据采集失败: {result['error']}"
return (
f"✅ 数据采集完成!\n"
f"📝 总笔记数: {result['total']}\n"
f"🔄 更新: {result['updated']}\n\n"
f"💡 点击「计算权重」进行智能学习"
)
except Exception as e:
logger.error("数据采集失败: %s", e)
return f"❌ 采集失败: {e}"
def analytics_calculate_weights():
"""计算内容权重"""
try:
result = analytics.calculate_weights()
if "error" in result:
return "" + result["error"], analytics.generate_report()
top = result.get("top_note")
top_str = f" | 🏆 最佳: {top['title']} (❤️ {top.get('likes', 0)})" if top else ""
msg = (
f"✅ 权重计算完成!\n"
f"📊 分析了 {result['total_notes']} 篇笔记{top_str}\n\n"
f"💡 权重已自动保存,启用「智能加权发布」后自动生效"
)
return msg, analytics.generate_report()
except Exception as e:
logger.error("权重计算失败: %s", e)
return f"❌ 计算失败: {e}", ""
def analytics_llm_deep_analysis(model):
"""LLM 深度分析笔记表现"""
note_data = analytics.generate_llm_analysis_prompt()
if not note_data:
return "❌ 暂无笔记数据,请先采集"
try:
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "❌ LLM 未配置"
svc = LLMService(api_key, base_url, model)
result = svc.analyze_note_performance(note_data)
lines = ["## 🧠 AI 深度分析报告\n"]
if result.get("high_perform_features"):
lines.append(f"### ✅ 高表现内容特征\n{result['high_perform_features']}\n")
if result.get("low_perform_issues"):
lines.append(f"### ⚠️ 低表现内容反思\n{result['low_perform_issues']}\n")
if result.get("user_preference"):
lines.append(f"### 👤 用户偏好画像\n{result['user_preference']}\n")
suggestions = result.get("content_suggestions", [])
if suggestions:
lines.append("### 📌 内容方向建议")
for s in suggestions:
priority = "🔴" if s.get("priority", 3) <= 2 else "🟡" if s.get("priority", 3) <= 3 else "🟢"
lines.append(f"- {priority} **{s.get('topic', '')}**: {s.get('reason', '')}")
lines.append("")
templates = result.get("title_templates", [])
if templates:
lines.append("### ✏️ 标题模板")
for t in templates:
lines.append(f"- 📝 {t}")
lines.append("")
tags = result.get("recommended_tags", [])
if tags:
lines.append(f"### 🏷️ 推荐标签\n{' '.join(f'`#{t}`' for t in tags)}\n")
return "\n".join(lines)
except Exception as e:
logger.error("LLM 分析失败: %s", e)
return f"❌ AI 分析失败: {e}"
def analytics_get_report():
"""获取分析报告"""
return analytics.generate_report()
def analytics_get_weighted_topics():
"""获取加权主题列表"""
weighted = analytics.get_weighted_topics_display()
if weighted:
return weighted
return "暂无权重数据,请先执行「采集数据 → 计算权重」"
def _learn_scheduler_loop(mcp_url, user_id, xsec_token, model, interval_hours):
"""定时学习后台循环"""
logger.info("定时学习已启动, 间隔 %s 小时", interval_hours)
_auto_log_append(f"🧠 定时学习已启动, 每 {interval_hours} 小时自动分析一次")
while _learn_running.is_set():
try:
# 采集数据
client = get_mcp_client(mcp_url)
result = analytics.collect_note_performance(client, user_id, xsec_token)
if "error" not in result:
_auto_log_append(f"🧠 自动采集完成: {result['total']} 篇笔记, 更新 {result['updated']}")
# 计算权重
weight_result = analytics.calculate_weights()
if "error" not in weight_result:
_auto_log_append(f"🧠 权重更新完成: 分析 {weight_result['total_notes']}")
# LLM 深度分析 (如果有配置)
api_key, base_url, _ = _get_llm_config()
if api_key and model:
try:
note_data = analytics.generate_llm_analysis_prompt()
if note_data:
svc = LLMService(api_key, base_url, model)
svc.analyze_note_performance(note_data)
_auto_log_append("🧠 AI 深度分析完成")
except Exception as e:
_auto_log_append(f"⚠️ AI 分析失败 (非致命): {e}")
else:
_auto_log_append(f"⚠️ 自动采集失败: {result.get('error', '未知')}")
except Exception as e:
_auto_log_append(f"⚠️ 定时学习异常: {e}")
# 等待下一次执行
wait_seconds = interval_hours * 3600
for _ in range(int(wait_seconds / 5)):
if not _learn_running.is_set():
break
time.sleep(5)
logger.info("定时学习已停止")
_auto_log_append("🧠 定时学习已停止")
def start_learn_scheduler(mcp_url, user_id, xsec_token, model, interval_hours):
"""启动定时学习"""
global _learn_thread
if _learn_running.is_set():
return "⚠️ 定时学习已在运行中"
if not user_id or not xsec_token:
return "❌ 请先在「账号登录」获取用户 ID 和 Token"
_learn_running.set()
_learn_thread = threading.Thread(
target=_learn_scheduler_loop,
args=(mcp_url, user_id, xsec_token, model, interval_hours),
daemon=True,
)
_learn_thread.start()
return f"✅ 定时学习已启动 🧠 每 {int(interval_hours)} 小时自动分析"
def stop_learn_scheduler():
"""停止定时学习"""
if not _learn_running.is_set():
return "⚠️ 定时学习未在运行"
_learn_running.clear()
return "🛑 定时学习已停止"
# ================================================== # ==================================================
# Windows 开机自启管理 # Windows 开机自启管理
# ================================================== # ==================================================
@ -2485,6 +2732,7 @@ with gr.Blocks(
label="SD 模型", allow_custom_value=True, label="SD 模型", allow_custom_value=True,
interactive=True, scale=2, interactive=True, scale=2,
) )
sd_model_info = gr.Markdown("选择模型后显示适配信息", elem_id="sd_model_info")
status_bar = gr.Markdown("🔄 等待连接...") status_bar = gr.Markdown("🔄 等待连接...")
gr.Markdown("---") gr.Markdown("---")
@ -2867,7 +3115,91 @@ with gr.Blocks(
label="笔记数据明细", label="笔记数据明细",
) )
# -------- Tab 6: 自动运营 -------- # -------- Tab 6: 智能学习 --------
with gr.Tab("🧠 智能学习"):
gr.Markdown(
"### 🧠 智能内容学习引擎\n"
"> 自动分析已发布笔记的表现,学习哪些内容受欢迎,用权重指导未来创作\n\n"
"**工作流程**: 采集数据 → 计算权重 → AI 深度分析 → 自动优化创作\n\n"
"💡 启用后,自动发布将优先生成高权重主题的内容"
)
with gr.Row():
# 左栏: 数据采集 & 权重计算
with gr.Column(scale=1):
gr.Markdown("#### 📊 数据采集")
learn_user_id = gr.Textbox(
label="用户 ID", value=config.get("my_user_id", ""),
interactive=True,
)
learn_xsec_token = gr.Textbox(
label="xsec_token", value=config.get("xsec_token", ""),
interactive=True,
)
btn_learn_collect = gr.Button(
"📊 采集笔记数据", variant="primary", size="lg",
)
learn_collect_status = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### ⚖️ 权重计算")
btn_learn_calc = gr.Button(
"⚖️ 计算内容权重", variant="primary", size="lg",
)
learn_calc_status = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 🤖 AI 深度分析")
gr.Markdown("> 用 LLM 分析笔记数据,找出内容规律,生成策略建议")
btn_learn_ai = gr.Button(
"🧠 AI 深度分析", variant="primary", size="lg",
)
gr.Markdown("---")
gr.Markdown("#### ⏰ 定时自动学习")
gr.Markdown("> 每隔 N 小时自动采集数据 + 计算权重 + AI 分析")
learn_interval = gr.Number(
label="学习间隔 (小时)", value=6, minimum=1, maximum=48,
)
with gr.Row():
btn_learn_start = gr.Button(
"▶ 启动定时学习", variant="primary", size="sm",
)
btn_learn_stop = gr.Button(
"⏹ 停止", variant="stop", size="sm",
)
learn_sched_status = gr.Markdown("⚪ 定时学习未启动")
gr.Markdown("---")
gr.Markdown("#### 🎯 加权主题预览")
gr.Markdown("> 当前权重最高的主题 (自动发布会优先选择)")
btn_show_topics = gr.Button("🔄 刷新加权主题", size="sm")
learn_weighted_topics = gr.Textbox(
label="加权主题池 (权重从高到低)",
value=analytics.get_weighted_topics_display() or "暂无权重数据",
interactive=False,
lines=2,
)
learn_use_weights = gr.Checkbox(
label="🧠 自动发布时使用智能权重 (推荐)",
value=cfg.get("use_smart_weights", True),
interactive=True,
)
# 右栏: 分析报告
with gr.Column(scale=2):
gr.Markdown("#### 📋 智能学习报告")
learn_report = gr.Markdown(
value=analytics.generate_report(),
label="分析报告",
)
gr.Markdown("---")
learn_ai_report = gr.Markdown(
value="*点击「AI 深度分析」生成*",
label="AI 深度分析报告",
)
# -------- Tab 7: 自动运营 --------
with gr.Tab("🤖 自动运营"): with gr.Tab("🤖 自动运营"):
gr.Markdown( gr.Markdown(
"### 🤖 无人值守自动化运营\n" "### 🤖 无人值守自动化运营\n"
@ -3097,7 +3429,11 @@ with gr.Blocks(
) )
btn_connect_sd.click( btn_connect_sd.click(
fn=connect_sd, inputs=[sd_url], fn=connect_sd, inputs=[sd_url],
outputs=[sd_model, status_bar], outputs=[sd_model, status_bar, sd_model_info],
)
sd_model.change(
fn=on_sd_model_change, inputs=[sd_model],
outputs=[sd_model_info],
) )
btn_check_mcp.click( btn_check_mcp.click(
fn=check_mcp_status, inputs=[mcp_url], fn=check_mcp_status, inputs=[mcp_url],
@ -3114,18 +3450,18 @@ with gr.Blocks(
# ---- Tab 1: 内容创作 ---- # ---- Tab 1: 内容创作 ----
btn_gen_copy.click( btn_gen_copy.click(
fn=generate_copy, fn=generate_copy,
inputs=[llm_model, topic, style], inputs=[llm_model, topic, style, sd_model, persona],
outputs=[res_title, res_content, res_prompt, res_tags, status_bar], outputs=[res_title, res_content, res_prompt, res_tags, status_bar],
) )
# 生成模式切换 → 同步更新步数/CFG预览 # 生成模式切换 → 同步更新步数/CFG预览
def on_quality_mode_change(mode): def on_quality_mode_change(mode, sd_model_val):
p = get_sd_preset(mode) p = get_sd_preset(mode, sd_model_val)
return p["steps"], p["cfg_scale"] return p["steps"], p["cfg_scale"]
quality_mode.change( quality_mode.change(
fn=on_quality_mode_change, fn=on_quality_mode_change,
inputs=[quality_mode], inputs=[quality_mode, sd_model],
outputs=[steps, cfg_scale], outputs=[steps, cfg_scale],
) )
@ -3168,7 +3504,7 @@ with gr.Blocks(
btn_gen_from_hot.click( btn_gen_from_hot.click(
fn=generate_from_hotspot, fn=generate_from_hotspot,
inputs=[llm_model, topic_from_hot, hot_style, search_output], inputs=[llm_model, topic_from_hot, hot_style, search_output, sd_model, persona],
outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status], outputs=[hot_title, hot_content, hot_prompt, hot_tags, hot_gen_status],
) )
@ -3278,7 +3614,44 @@ with gr.Blocks(
outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail], outputs=[data_status, profile_card, chart_interact, chart_notes, notes_detail],
) )
# ---- Tab 6: 自动运营 ---- # ---- Tab 6: 智能学习 ----
btn_learn_collect.click(
fn=analytics_collect_data,
inputs=[mcp_url, learn_user_id, learn_xsec_token],
outputs=[learn_collect_status],
)
btn_learn_calc.click(
fn=analytics_calculate_weights,
inputs=[],
outputs=[learn_calc_status, learn_report],
)
btn_learn_ai.click(
fn=analytics_llm_deep_analysis,
inputs=[llm_model],
outputs=[learn_ai_report],
)
btn_learn_start.click(
fn=start_learn_scheduler,
inputs=[mcp_url, learn_user_id, learn_xsec_token, llm_model, learn_interval],
outputs=[learn_sched_status],
)
btn_learn_stop.click(
fn=stop_learn_scheduler,
inputs=[],
outputs=[learn_sched_status],
)
btn_show_topics.click(
fn=analytics_get_weighted_topics,
inputs=[],
outputs=[learn_weighted_topics],
)
learn_use_weights.change(
fn=lambda v: cfg.set("use_smart_weights", v) or ("✅ 智能权重已启用" if v else "⚪ 智能权重已关闭"),
inputs=[learn_use_weights],
outputs=[learn_sched_status],
)
# ---- Tab 7: 自动运营 ----
# 人设切换 → 联动更新评论关键词池和主题池 # 人设切换 → 联动更新评论关键词池和主题池
persona.change( persona.change(
fn=on_persona_changed, fn=on_persona_changed,
@ -3308,7 +3681,7 @@ with gr.Blocks(
) )
btn_auto_publish.click( btn_auto_publish.click(
fn=_auto_publish_with_log, fn=_auto_publish_with_log,
inputs=[auto_publish_topics, mcp_url, sd_url, sd_model, llm_model, face_swap_toggle], inputs=[auto_publish_topics, mcp_url, sd_url, sd_model, llm_model, persona, quality_mode, face_swap_toggle],
outputs=[auto_publish_result, auto_log_display], outputs=[auto_publish_result, auto_log_display],
) )
btn_start_sched.click( btn_start_sched.click(
@ -3322,7 +3695,7 @@ with gr.Blocks(
sched_start_hour, sched_end_hour, sched_start_hour, sched_end_hour,
auto_comment_keywords, auto_publish_topics, auto_comment_keywords, auto_publish_topics,
mcp_url, sd_url, sd_model, llm_model, persona, mcp_url, sd_url, sd_model, llm_model, persona,
face_swap_toggle], quality_mode, face_swap_toggle],
outputs=[sched_result], outputs=[sched_result],
) )
btn_stop_sched.click( btn_stop_sched.click(

View File

@ -16,9 +16,156 @@ SD_TIMEOUT = 1800 # 图片生成可能需要较长时间
# 头像文件默认保存路径 # 头像文件默认保存路径
FACE_IMAGE_PATH = os.path.join(os.path.dirname(__file__), "my_face.png") FACE_IMAGE_PATH = os.path.join(os.path.dirname(__file__), "my_face.png")
# ==================== 生成质量预设 ==================== # ==================== 多模型配置系统 ====================
# 针对 JuggernautXL (SDXL) 优化的三档参数 # 每个模型的最优参数、prompt 增强词、负面提示词、三档预设
SD_PRESETS = {
SD_MODEL_PROFILES = {
# ---- majicmixRealistic: 东亚网红感,朋友圈自拍/美妆/穿搭 (SD 1.5) ----
"majicmixRealistic": {
"display_name": "majicmixRealistic ⭐⭐⭐⭐⭐",
"description": "东亚网红感 | 朋友圈自拍、美妆、穿搭",
"arch": "sd15", # SD 1.5 架构
# 自动追加到 prompt 前面的增强词
"prompt_prefix": (
"(best quality:1.4), (masterpiece:1.4), (ultra detailed:1.3), "
"(photorealistic:1.4), (realistic:1.3), raw photo, "
"(asian girl:1.3), (chinese:1.2), (east asian features:1.2), "
"(delicate facial features:1.2), (fair skin:1.1), (natural skin texture:1.2), "
"(soft lighting:1.1), (natural makeup:1.1), "
),
# 自动追加到 prompt 后面的补充词
"prompt_suffix": (
", film grain, shallow depth of field, "
"instagram aesthetic, xiaohongshu style, phone camera feel"
),
"negative_prompt": (
"(nsfw:1.5), (nudity:1.5), (worst quality:2), (low quality:2), (normal quality:2), "
"lowres, bad anatomy, bad hands, text, error, missing fingers, "
"extra digit, fewer digits, cropped, jpeg artifacts, signature, watermark, "
"blurry, deformed, mutated, disfigured, ugly, duplicate, "
"poorly drawn face, poorly drawn hands, extra limbs, fused fingers, "
"too many fingers, long neck, out of frame, "
"western face, european face, caucasian, deep-set eyes, high nose bridge, "
"blonde hair, red hair, blue eyes, green eyes, freckles, thick body hair, "
"painting, cartoon, anime, sketch, illustration, 3d render"
),
"presets": {
"快速 (约30秒)": {
"steps": 20,
"cfg_scale": 7.0,
"width": 512,
"height": 768,
"sampler_name": "Euler a",
"scheduler": "Normal",
"batch_size": 2,
},
"标准 (约1分钟)": {
"steps": 30,
"cfg_scale": 7.0,
"width": 512,
"height": 768,
"sampler_name": "DPM++ 2M",
"scheduler": "Karras",
"batch_size": 2,
},
"精细 (约2-3分钟)": {
"steps": 40,
"cfg_scale": 7.5,
"width": 576,
"height": 864,
"sampler_name": "DPM++ SDE",
"scheduler": "Karras",
"batch_size": 2,
},
},
},
# ---- Realistic Vision: 写实摄影感,纪实摄影/街拍/真实质感 (SD 1.5) ----
"realisticVision": {
"display_name": "Realistic Vision ⭐⭐⭐⭐",
"description": "写实摄影感 | 纪实摄影、街拍、真实质感",
"arch": "sd15",
"prompt_prefix": (
"RAW photo, (best quality:1.4), (masterpiece:1.3), (realistic:1.4), "
"(photorealistic:1.4), 8k uhd, DSLR, high quality, "
"(asian:1.2), (chinese girl:1.2), (east asian features:1.1), "
"(natural skin:1.2), (skin pores:1.1), (detailed skin texture:1.2), "
),
"prompt_suffix": (
", shot on Canon EOS R5, 85mm lens, f/1.8, "
"natural lighting, documentary style, street photography, "
"film color grading, depth of field"
),
"negative_prompt": (
"(nsfw:1.5), (nudity:1.5), (worst quality:2), (low quality:2), (normal quality:2), "
"lowres, bad anatomy, bad hands, text, error, missing fingers, "
"extra digit, fewer digits, cropped, jpeg artifacts, signature, watermark, "
"blurry, deformed, mutated, disfigured, ugly, duplicate, "
"poorly drawn face, extra limbs, fused fingers, long neck, "
"western face, european face, caucasian, deep-set eyes, "
"blonde hair, blue eyes, green eyes, freckles, "
"painting, cartoon, anime, sketch, illustration, 3d render, "
"over-sharpened, over-saturated, plastic skin, airbrushed, "
"smooth skin, doll-like, HDR, overprocessed"
),
"presets": {
"快速 (约30秒)": {
"steps": 20,
"cfg_scale": 7.0,
"width": 512,
"height": 768,
"sampler_name": "Euler a",
"scheduler": "Normal",
"batch_size": 2,
},
"标准 (约1分钟)": {
"steps": 28,
"cfg_scale": 7.0,
"width": 512,
"height": 768,
"sampler_name": "DPM++ 2M",
"scheduler": "Karras",
"batch_size": 2,
},
"精细 (约2-3分钟)": {
"steps": 40,
"cfg_scale": 7.5,
"width": 576,
"height": 864,
"sampler_name": "DPM++ SDE",
"scheduler": "Karras",
"batch_size": 2,
},
},
},
# ---- Juggernaut XL: 电影大片感,高画质/商业摄影/复杂背景 (SDXL) ----
"juggernautXL": {
"display_name": "Juggernaut XL ⭐⭐⭐⭐",
"description": "电影大片感 | 高画质、商业摄影、复杂背景",
"arch": "sdxl", # SDXL 架构
"prompt_prefix": (
"masterpiece, best quality, ultra detailed, 8k uhd, high resolution, "
"photorealistic, cinematic lighting, cinematic composition, "
"asian girl, chinese, east asian features, black hair, dark brown eyes, "
"delicate facial features, fair skin, slim figure, "
),
"prompt_suffix": (
", cinematic color grading, anamorphic lens, bokeh, "
"volumetric lighting, ray tracing, global illumination, "
"commercial photography, editorial style, vogue aesthetic"
),
"negative_prompt": (
"nsfw, nudity, lowres, bad anatomy, bad hands, text, error, missing fingers, "
"extra digit, fewer digits, cropped, worst quality, low quality, normal quality, "
"jpeg artifacts, signature, watermark, blurry, deformed, mutated, disfigured, "
"ugly, duplicate, morbid, mutilated, poorly drawn face, poorly drawn hands, "
"extra limbs, fused fingers, too many fingers, long neck, username, "
"out of frame, distorted, oversaturated, underexposed, overexposed, "
"western face, european face, caucasian, deep-set eyes, high nose bridge, "
"blonde hair, red hair, blue eyes, green eyes, freckles, thick body hair"
),
"presets": {
"快速 (约30秒)": { "快速 (约30秒)": {
"steps": 12, "steps": 12,
"cfg_scale": 5.0, "cfg_scale": 5.0,
@ -46,27 +193,73 @@ SD_PRESETS = {
"scheduler": "Karras", "scheduler": "Karras",
"batch_size": 2, "batch_size": 2,
}, },
},
},
} }
# 默认配置 profile key
DEFAULT_MODEL_PROFILE = "juggernautXL"
def detect_model_profile(model_name: str) -> str:
"""根据 SD 模型名称自动识别对应的 profile key"""
name_lower = model_name.lower() if model_name else ""
if "majicmix" in name_lower or "majic" in name_lower:
return "majicmixRealistic"
elif "realistic" in name_lower and "vision" in name_lower:
return "realisticVision"
elif "rv" in name_lower and ("v5" in name_lower or "v6" in name_lower or "v4" in name_lower):
return "realisticVision" # RV v5.1 等简写
elif "juggernaut" in name_lower or "jugger" in name_lower:
return "juggernautXL"
# 根据架构猜测
elif "xl" in name_lower or "sdxl" in name_lower:
return "juggernautXL" # SDXL 架构默认用 Juggernaut 参数
else:
return DEFAULT_MODEL_PROFILE # 无法识别时默认
def get_model_profile(model_name: str = None) -> dict:
"""获取模型配置 profile"""
key = detect_model_profile(model_name) if model_name else DEFAULT_MODEL_PROFILE
return SD_MODEL_PROFILES.get(key, SD_MODEL_PROFILES[DEFAULT_MODEL_PROFILE])
def get_model_profile_info(model_name: str = None) -> str:
"""获取当前模型的显示信息 (Markdown 格式)"""
profile = get_model_profile(model_name)
key = detect_model_profile(model_name) if model_name else DEFAULT_MODEL_PROFILE
is_default = key == DEFAULT_MODEL_PROFILE and model_name and detect_model_profile(model_name) == DEFAULT_MODEL_PROFILE
# 如果检测结果是默认回退的, 说明是未知模型
actual_key = detect_model_profile(model_name) if model_name else None
presets = profile["presets"]
first_preset = list(presets.values())[0]
res = f"{first_preset.get('width', '?')}×{first_preset.get('height', '?')}"
lines = [
f"**🎨 {profile['display_name']}** | `{profile['arch'].upper()}` | {res}",
f"> {profile['description']}",
]
if model_name and not any(k in (model_name or "").lower() for k in ["majicmix", "realistic", "juggernaut"]):
lines.append(f"> ⚠️ 未识别的模型,使用默认档案 ({profile['display_name']})")
return "\n".join(lines)
# ==================== 兼容旧接口 ====================
# 默认预设和反向提示词 (使用 Juggernaut XL 作为默认)
SD_PRESETS = SD_MODEL_PROFILES[DEFAULT_MODEL_PROFILE]["presets"]
SD_PRESET_NAMES = list(SD_PRESETS.keys()) SD_PRESET_NAMES = list(SD_PRESETS.keys())
def get_sd_preset(name: str) -> dict: def get_sd_preset(name: str, model_name: str = None) -> dict:
"""获取生成预设参数,默认返回'标准'""" """获取生成预设参数,自动适配模型"""
return SD_PRESETS.get(name, SD_PRESETS["标准 (约1分钟)"]) profile = get_model_profile(model_name)
presets = profile.get("presets", SD_PRESETS)
return presets.get(name, presets.get("标准 (约1分钟)", list(presets.values())[0]))
# 默认反向提示词(针对 JuggernautXL / SDXL 优化,偏向东方审美) # 默认反向提示词Juggernaut XL
DEFAULT_NEGATIVE = ( DEFAULT_NEGATIVE = SD_MODEL_PROFILES[DEFAULT_MODEL_PROFILE]["negative_prompt"]
"nsfw, nudity, lowres, bad anatomy, bad hands, text, error, missing fingers, "
"extra digit, fewer digits, cropped, worst quality, low quality, normal quality, "
"jpeg artifacts, signature, watermark, blurry, deformed, mutated, disfigured, "
"ugly, duplicate, morbid, mutilated, poorly drawn face, poorly drawn hands, "
"extra limbs, fused fingers, too many fingers, long neck, username, "
"out of frame, distorted, oversaturated, underexposed, overexposed, "
"western face, european face, caucasian, deep-set eyes, high nose bridge, "
"blonde hair, red hair, blue eyes, green eyes, freckles, thick body hair"
)
class SDService: class SDService:
@ -201,7 +394,7 @@ class SDService:
def txt2img( def txt2img(
self, self,
prompt: str, prompt: str,
negative_prompt: str = DEFAULT_NEGATIVE, negative_prompt: str = None,
model: str = None, model: str = None,
steps: int = None, steps: int = None,
cfg_scale: float = None, cfg_scale: float = None,
@ -214,21 +407,34 @@ class SDService:
face_image: Image.Image = None, face_image: Image.Image = None,
quality_mode: str = None, quality_mode: str = None,
) -> list[Image.Image]: ) -> list[Image.Image]:
"""文生图(参数针对 JuggernautXL 优化 """文生图(自动适配当前 SD 模型的最优参数
Args: Args:
model: SD 模型名自动识别并应用对应配置
face_image: 头像 PIL Image传入后自动启用 ReActor 换脸 face_image: 头像 PIL Image传入后自动启用 ReActor 换脸
quality_mode: 预设模式名 '快速 (约30秒)' / '标准 (约1分钟)' / '精细 (约2-3分钟)' quality_mode: 预设模式名
传入后自动应用预设参数其余参数可覆盖
""" """
if model: if model:
self.switch_model(model) self.switch_model(model)
# 加载预设作为基底,再用显式参数覆盖 # 自动识别模型配置
preset = get_sd_preset(quality_mode) if quality_mode else get_sd_preset("标准 (约1分钟)") profile = get_model_profile(model)
profile_key = detect_model_profile(model)
logger.info("🎯 SD 模型识别: %s%s (%s)",
model or "默认", profile_key, profile["description"])
# 加载模型专属预设参数
preset = get_sd_preset(quality_mode, model) if quality_mode else get_sd_preset("标准 (约1分钟)", model)
# 自动增强 prompt: 前缀 + 原始 prompt + 后缀
enhanced_prompt = profile.get("prompt_prefix", "") + prompt + profile.get("prompt_suffix", "")
# 使用模型专属反向提示词
final_negative = negative_prompt if negative_prompt is not None else profile.get("negative_prompt", DEFAULT_NEGATIVE)
payload = { payload = {
"prompt": prompt, "prompt": enhanced_prompt,
"negative_prompt": negative_prompt, "negative_prompt": final_negative,
"steps": steps if steps is not None else preset["steps"], "steps": steps if steps is not None else preset["steps"],
"cfg_scale": cfg_scale if cfg_scale is not None else preset["cfg_scale"], "cfg_scale": cfg_scale if cfg_scale is not None else preset["cfg_scale"],
"width": width if width is not None else preset["width"], "width": width if width is not None else preset["width"],
@ -238,8 +444,8 @@ class SDService:
"sampler_name": sampler_name if sampler_name is not None else preset["sampler_name"], "sampler_name": sampler_name if sampler_name is not None else preset["sampler_name"],
"scheduler": scheduler if scheduler is not None else preset["scheduler"], "scheduler": scheduler if scheduler is not None else preset["scheduler"],
} }
logger.info("SD 生成参数: steps=%s, cfg=%.1f, %dx%d, sampler=%s", logger.info("SD 生成参数 [%s]: steps=%s, cfg=%.1f, %dx%d, sampler=%s",
payload['steps'], payload['cfg_scale'], profile_key, payload['steps'], payload['cfg_scale'],
payload['width'], payload['height'], payload['sampler_name']) payload['width'], payload['height'], payload['sampler_name'])
# 如果提供了头像,通过 ReActor 换脸 # 如果提供了头像,通过 ReActor 换脸
@ -264,30 +470,37 @@ class SDService:
self, self,
init_image: Image.Image, init_image: Image.Image,
prompt: str, prompt: str,
negative_prompt: str = DEFAULT_NEGATIVE, negative_prompt: str = None,
denoising_strength: float = 0.5, denoising_strength: float = 0.5,
steps: int = 30, steps: int = 30,
cfg_scale: float = 5.0, cfg_scale: float = None,
sampler_name: str = "DPM++ 2M", sampler_name: str = None,
scheduler: str = "Karras", scheduler: str = None,
model: str = None,
) -> list[Image.Image]: ) -> list[Image.Image]:
"""图生图(参数针对 JuggernautXL 优化)""" """图生图(自动适配模型参数)"""
profile = get_model_profile(model)
preset = get_sd_preset("标准 (约1分钟)", model)
# 将 PIL Image 转为 base64 # 将 PIL Image 转为 base64
buf = io.BytesIO() buf = io.BytesIO()
init_image.save(buf, format="PNG") init_image.save(buf, format="PNG")
init_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") init_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
enhanced_prompt = profile.get("prompt_prefix", "") + prompt + profile.get("prompt_suffix", "")
final_negative = negative_prompt if negative_prompt is not None else profile.get("negative_prompt", DEFAULT_NEGATIVE)
payload = { payload = {
"init_images": [init_b64], "init_images": [init_b64],
"prompt": prompt, "prompt": enhanced_prompt,
"negative_prompt": negative_prompt, "negative_prompt": final_negative,
"denoising_strength": denoising_strength, "denoising_strength": denoising_strength,
"steps": steps, "steps": steps,
"cfg_scale": cfg_scale, "cfg_scale": cfg_scale if cfg_scale is not None else preset["cfg_scale"],
"width": init_image.width, "width": init_image.width,
"height": init_image.height, "height": init_image.height,
"sampler_name": sampler_name, "sampler_name": sampler_name if sampler_name is not None else preset["sampler_name"],
"scheduler": scheduler, "scheduler": scheduler if scheduler is not None else preset["scheduler"],
} }
resp = requests.post( resp = requests.post(