- 新增智能选题引擎 `TopicEngine`,整合热点数据与历史权重,提供多维度评分和创作角度建议 - 新增内容模板系统 `ContentTemplate`,支持从 JSON 文件加载模板并应用于文案生成 - 新增批量创作功能 `batch_generate_copy`,支持串行生成多篇文案并自动入草稿队列 - 升级文案质量流水线:实现 Prompt 分层架构(基础层 + 风格层 + 人设层)、LLM 自检与改写机制、深度去 AI 化后处理 - 优化图文协同:新增封面图策略选择、SD prompt 与文案语义联动、图文匹配度评估 - 集成数据闭环:在文案生成中自动注入 `AnalyticsService` 权重数据,实现发布 → 数据回收 → 优化创作的完整循环 - 更新 UI 组件:新增选题推荐展示区、批量创作折叠面板、封面图策略选择器和图文匹配度评分展示 ♻️ refactor(llm): 重构 Prompt 架构并增强去 AI 化处理 - 将 `PROMPT_COPYWRITING` 拆分为分层架构(基础层 + 风格层 + 人设层),提高维护性和灵活性 - 增强 `_humanize_content` 方法:新增语气词注入、标点不规范化、段落节奏打散和 emoji 密度控制 - 新增 `_self_check` 和 `_self_check_rewrite` 方法,实现文案 AI 痕迹自检与自动改写 - 新增 `evaluate_image_text_match` 方法,支持文案与 SD prompt 的语义匹配度评估(可选,失败不阻塞) - 新增封面图策略配置 `COVER_STRATEGIES` 和情感基调映射 `EMOTION_SD_MAP` 📝 docs(openspec): 归档内容创作优化提案和详细规格 - 新增 `openspec/changes/archive/2026-02-28-optimize-content-creation/` 目录,包含设计文档、提案、规格说明和任务清单 - 新增 `openspec/specs/` 下的批量创作、文案质量流水线、图文协同、服务内容和智能选题引擎规格文档 - 更新 `openspec/specs/services-content/spec.md`,反映新增的批量创作和智能选题入口函数 🔧 chore(config): 更新服务配置和 UI 集成 - 在 `services/content.py` 中集成权重数据自动注入逻辑,实现数据驱动创作 - 在 `ui/app.py` 中新增选题推荐、批量生成和图文匹配度评估的回调函数 - 在 `ui/tab_create.py` 中新增智能选题推荐区、批量创作面板和图文匹配度评估组件 - 修复 `services/sd_service.py` 中的头像文件路径问题,确保目录存在
408 lines
14 KiB
Python
408 lines
14 KiB
Python
"""
|
||
services/content.py
|
||
文案生成、图片生成、一键导出、发布到小红书
|
||
"""
|
||
import os
|
||
import re
|
||
import time
|
||
import platform
|
||
import subprocess
|
||
import logging
|
||
|
||
from PIL import Image
|
||
|
||
from .config_manager import ConfigManager, OUTPUT_DIR
|
||
from .llm_service import LLMService
|
||
from .sd_service import SDService, get_sd_preset
|
||
from .mcp_client import get_mcp_client
|
||
from .connection import _get_llm_config
|
||
from .persona import _resolve_persona
|
||
|
||
logger = logging.getLogger("autobot")
|
||
cfg = ConfigManager()
|
||
|
||
def generate_copy(model, topic, style, sd_model_name, persona_text):
|
||
"""生成文案(自动适配 SD 模型,支持人设,自动注入权重数据)"""
|
||
api_key, base_url, _ = _get_llm_config()
|
||
if not api_key:
|
||
return "", "", "", "", "❌ 请先配置并连接 LLM 提供商"
|
||
try:
|
||
svc = LLMService(api_key, base_url, model)
|
||
persona = _resolve_persona(persona_text) if persona_text else None
|
||
|
||
# 尝试自动注入权重数据(数据闭环 9.1)
|
||
data = None
|
||
try:
|
||
from .analytics_service import AnalyticsService
|
||
analytics = AnalyticsService()
|
||
if analytics.has_weights:
|
||
weight_insights = analytics.weights_summary
|
||
title_advice = analytics.get_title_advice()
|
||
hot_tags = ", ".join(analytics.get_top_tags(8))
|
||
data = svc.generate_weighted_copy(
|
||
topic, style,
|
||
weight_insights=weight_insights,
|
||
title_advice=title_advice,
|
||
hot_tags=hot_tags,
|
||
sd_model_name=sd_model_name,
|
||
persona=persona,
|
||
)
|
||
logger.info("使用加权文案生成路径(权重数据已注入)")
|
||
except Exception as e:
|
||
logger.debug("权重数据注入跳过: %s", e)
|
||
|
||
# 无权重或权重路径失败时,退回基础生成
|
||
if data is None:
|
||
data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona)
|
||
|
||
cfg.set("model", model)
|
||
tags = data.get("tags", [])
|
||
return (
|
||
data.get("title", ""),
|
||
data.get("content", ""),
|
||
data.get("sd_prompt", ""),
|
||
", ".join(tags) if tags else "",
|
||
"✅ 文案生成完毕",
|
||
)
|
||
except Exception as e:
|
||
logger.error("文案生成失败: %s", e)
|
||
return "", "", "", "", f"❌ 生成失败: {e}"
|
||
|
||
|
||
def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale, face_swap_on, face_img, quality_mode, persona_text=None, enhance_level: float = 1.0):
|
||
"""生成图片(可选 ReActor 换脸,支持质量模式预设,支持人设视觉优化,支持美化增强)"""
|
||
if not model:
|
||
return None, [], "❌ 未选择 SD 模型"
|
||
try:
|
||
svc = SDService(sd_url)
|
||
# 判断是否启用换脸
|
||
face_image = None
|
||
if face_swap_on:
|
||
# Gradio 可能传 PIL.Image / numpy.ndarray / 文件路径 / None
|
||
if face_img is not None:
|
||
if isinstance(face_img, Image.Image):
|
||
face_image = face_img
|
||
elif isinstance(face_img, str) and os.path.isfile(face_img):
|
||
face_image = Image.open(face_img).convert("RGB")
|
||
else:
|
||
# numpy array 等其他格式
|
||
try:
|
||
import numpy as np
|
||
if isinstance(face_img, np.ndarray):
|
||
face_image = Image.fromarray(face_img).convert("RGB")
|
||
logger.info("头像从 numpy array 转换为 PIL Image")
|
||
except Exception as e:
|
||
logger.warning("头像格式转换失败 (%s): %s", type(face_img).__name__, e)
|
||
# 如果 UI 没传有效头像,从本地文件加载
|
||
if face_image is None:
|
||
face_image = SDService.load_face_image()
|
||
if face_image is not None:
|
||
logger.info("换脸头像已就绪: %dx%d", face_image.width, face_image.height)
|
||
else:
|
||
logger.warning("换脸已启用但未找到有效头像")
|
||
|
||
persona = _resolve_persona(persona_text) if persona_text else None
|
||
images = svc.txt2img(
|
||
prompt=prompt,
|
||
negative_prompt=neg_prompt,
|
||
model=model,
|
||
steps=int(steps),
|
||
cfg_scale=float(cfg_scale),
|
||
face_image=face_image,
|
||
quality_mode=quality_mode,
|
||
persona=persona,
|
||
enhance_level=float(enhance_level),
|
||
)
|
||
preset = get_sd_preset(quality_mode)
|
||
swap_hint = " (已换脸)" if face_image else ""
|
||
return images, images, f"✅ 生成 {len(images)} 张图片{swap_hint} [{quality_mode}]"
|
||
except Exception as e:
|
||
logger.error("图片生成失败: %s", e)
|
||
return None, [], f"❌ 绘图失败: {e}"
|
||
|
||
|
||
def one_click_export(title, content, images):
|
||
"""导出文案和图片到本地"""
|
||
if not title:
|
||
return "❌ 无法导出:没有标题"
|
||
|
||
safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20]
|
||
folder_name = f"{int(time.time())}_{safe_title}"
|
||
folder_path = os.path.join(OUTPUT_DIR, folder_name)
|
||
os.makedirs(folder_path, exist_ok=True)
|
||
|
||
with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f:
|
||
f.write(f"{title}\n\n{content}")
|
||
|
||
saved_paths = []
|
||
if images:
|
||
for idx, img in enumerate(images):
|
||
path = os.path.join(folder_path, f"图{idx+1}.jpg")
|
||
if isinstance(img, Image.Image):
|
||
if img.mode != "RGB":
|
||
img = img.convert("RGB")
|
||
img.save(path, format="JPEG", quality=95)
|
||
saved_paths.append(os.path.abspath(path))
|
||
|
||
# 尝试打开文件夹
|
||
try:
|
||
abs_path = os.path.abspath(folder_path)
|
||
if platform.system() == "Windows":
|
||
os.startfile(abs_path)
|
||
elif platform.system() == "Darwin":
|
||
subprocess.call(["open", abs_path])
|
||
else:
|
||
subprocess.call(["xdg-open", abs_path])
|
||
except Exception:
|
||
pass
|
||
|
||
return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)"
|
||
|
||
|
||
def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time):
|
||
"""通过 MCP 发布到小红书(含输入校验和临时文件自动清理)"""
|
||
# === 发布前校验 ===
|
||
if not title:
|
||
return "❌ 缺少标题"
|
||
if len(title) > 20:
|
||
return f"❌ 标题超长:当前 {len(title)} 字,小红书限制 ≤20 字,请精简后再发布"
|
||
|
||
client = get_mcp_client(mcp_url)
|
||
ai_temp_files: list = [] # 追踪本次写入的临时文件,用于 finally 清理
|
||
|
||
try:
|
||
# 收集图片路径
|
||
image_paths = []
|
||
|
||
# 先保存 AI 生成的图片到临时目录
|
||
if images:
|
||
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
for idx, img in enumerate(images):
|
||
if isinstance(img, Image.Image):
|
||
path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.jpg"))
|
||
if img.mode != "RGB":
|
||
img = img.convert("RGB")
|
||
img.save(path, format="JPEG", quality=95)
|
||
image_paths.append(path)
|
||
ai_temp_files.append(path) # 登记临时文件
|
||
|
||
# 添加本地上传的图片
|
||
if local_images:
|
||
for img_file in local_images:
|
||
img_path = img_file.name if hasattr(img_file, 'name') else str(img_file)
|
||
if os.path.exists(img_path):
|
||
image_paths.append(os.path.abspath(img_path))
|
||
|
||
# === 图片校验 ===
|
||
if not image_paths:
|
||
return "❌ 至少需要 1 张图片才能发布"
|
||
if len(image_paths) > 18:
|
||
return f"❌ 图片数量超限:当前 {len(image_paths)} 张,小红书限制 ≤18 张,请减少图片"
|
||
for p in image_paths:
|
||
if not os.path.exists(p):
|
||
return f"❌ 图片文件不存在:{p}"
|
||
|
||
# 解析标签
|
||
tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None
|
||
|
||
# 定时发布
|
||
schedule = schedule_time if schedule_time and schedule_time.strip() else None
|
||
|
||
result = client.publish_content(
|
||
title=title,
|
||
content=content,
|
||
images=image_paths,
|
||
tags=tags,
|
||
schedule_at=schedule,
|
||
)
|
||
if "error" in result:
|
||
return f"❌ 发布失败: {result['error']}"
|
||
return f"✅ 发布成功!\n{result.get('text', '')}"
|
||
except Exception as e:
|
||
logger.error("发布失败: %s", e)
|
||
return f"❌ 发布异常: {e}"
|
||
finally:
|
||
# 清理本次写入的 AI 临时图片(无论成功/失败)
|
||
for tmp_path in ai_temp_files:
|
||
try:
|
||
if os.path.exists(tmp_path):
|
||
os.remove(tmp_path)
|
||
except OSError as cleanup_err:
|
||
logger.warning("临时文件清理失败 %s: %s", tmp_path, cleanup_err)
|
||
|
||
|
||
# ========== 批量创作 ==========
|
||
|
||
def batch_generate_copy(
|
||
model: str,
|
||
topics: list[str],
|
||
style: str,
|
||
sd_model_name: str = "",
|
||
persona_text: str = "",
|
||
template_name: str = "",
|
||
publish_queue=None,
|
||
) -> tuple[list[dict], str]:
|
||
"""
|
||
批量生成多篇文案(串行),自动插入发布队列草稿
|
||
|
||
Args:
|
||
model: LLM 模型名
|
||
topics: 主题列表 (最多 10 个)
|
||
style: 写作风格
|
||
sd_model_name: SD 模型名
|
||
persona_text: 人设文本
|
||
template_name: 可选的模板名
|
||
publish_queue: 可选的 PublishQueue 实例
|
||
|
||
Returns:
|
||
(results_list, status_msg)
|
||
"""
|
||
if not topics:
|
||
return [], "❌ 请输入至少一个主题"
|
||
if len(topics) > 10:
|
||
return [], "❌ 批量生成最多支持 10 个主题,请减少数量"
|
||
|
||
api_key, base_url, _ = _get_llm_config()
|
||
if not api_key:
|
||
return [], "❌ 请先配置并连接 LLM 提供商"
|
||
|
||
# 加载模板覆盖
|
||
prompt_override = ""
|
||
tags_preset = []
|
||
if template_name:
|
||
try:
|
||
from .content_template import ContentTemplate
|
||
ct = ContentTemplate()
|
||
override = ct.apply_template(template_name)
|
||
style = override.get("style") or style
|
||
prompt_override = override.get("prompt_override", "")
|
||
tags_preset = override.get("tags_preset", [])
|
||
except Exception as e:
|
||
logger.warning("模板加载失败,使用默认参数: %s", e)
|
||
|
||
svc = LLMService(api_key, base_url, model)
|
||
persona = _resolve_persona(persona_text) if persona_text else None
|
||
|
||
results = []
|
||
success_count = 0
|
||
fail_count = 0
|
||
|
||
for idx, topic in enumerate(topics):
|
||
topic = topic.strip()
|
||
if not topic:
|
||
continue
|
||
|
||
try:
|
||
data = svc.generate_copy(
|
||
topic, style,
|
||
sd_model_name=sd_model_name,
|
||
persona=persona,
|
||
)
|
||
|
||
# 如有模板 prompt_override,它已通过风格参数间接生效
|
||
# 合并模板标签
|
||
tags = data.get("tags", [])
|
||
if tags_preset:
|
||
existing = set(tags)
|
||
for t in tags_preset:
|
||
if t not in existing:
|
||
tags.append(t)
|
||
data["tags"] = tags
|
||
|
||
data["batch_index"] = idx
|
||
results.append(data)
|
||
success_count += 1
|
||
|
||
# 自动入队为草稿
|
||
if publish_queue:
|
||
try:
|
||
publish_queue.add(
|
||
title=data.get("title", ""),
|
||
content=data.get("content", ""),
|
||
sd_prompt=data.get("sd_prompt", ""),
|
||
tags=data.get("tags", []),
|
||
topic=topic,
|
||
style=style,
|
||
persona=persona_text if persona_text else "",
|
||
status="draft",
|
||
)
|
||
except Exception as e:
|
||
logger.warning("批量草稿入队失败 #%d: %s", idx, e)
|
||
|
||
logger.info("批量生成 %d/%d 完成: %s", idx + 1, len(topics), topic[:20])
|
||
|
||
except Exception as e:
|
||
logger.error("批量生成 %d/%d 失败 [%s]: %s", idx + 1, len(topics), topic[:20], e)
|
||
results.append({
|
||
"batch_index": idx,
|
||
"topic": topic,
|
||
"error": str(e),
|
||
})
|
||
fail_count += 1
|
||
|
||
status = f"✅ 批量生成完成: {success_count} 成功"
|
||
if fail_count:
|
||
status += f", {fail_count} 失败"
|
||
if publish_queue and success_count:
|
||
status += f" | {success_count} 篇已入草稿队列"
|
||
|
||
return results, status
|
||
|
||
|
||
def generate_copy_with_topic_engine(
|
||
model: str,
|
||
style: str,
|
||
sd_model_name: str = "",
|
||
persona_text: str = "",
|
||
count: int = 1,
|
||
hotspot_data: dict = None,
|
||
publish_queue=None,
|
||
) -> tuple[list[dict], str]:
|
||
"""
|
||
使用智能选题引擎自动选题 + 生成文案
|
||
|
||
Args:
|
||
model: LLM 模型名
|
||
style: 写作风格
|
||
sd_model_name: SD 模型名
|
||
persona_text: 人设文本
|
||
count: 生成篇数
|
||
hotspot_data: 可选的热点分析数据
|
||
publish_queue: 可选的 PublishQueue 实例
|
||
|
||
Returns:
|
||
(results_list, status_msg)
|
||
"""
|
||
try:
|
||
from .analytics_service import AnalyticsService
|
||
from .topic_engine import TopicEngine
|
||
|
||
analytics = AnalyticsService()
|
||
engine = TopicEngine(analytics)
|
||
recommendations = engine.recommend_topics(count=count, hotspot_data=hotspot_data)
|
||
|
||
if not recommendations:
|
||
return [], "❌ 选题引擎未找到推荐主题,请先进行热点搜索或积累数据"
|
||
|
||
topics = [r["topic"] for r in recommendations]
|
||
results, status = batch_generate_copy(
|
||
model=model,
|
||
topics=topics,
|
||
style=style,
|
||
sd_model_name=sd_model_name,
|
||
persona_text=persona_text,
|
||
publish_queue=publish_queue,
|
||
)
|
||
|
||
# 把选题推荐信息附加到结果
|
||
for result in results:
|
||
idx = result.get("batch_index", -1)
|
||
if 0 <= idx < len(recommendations):
|
||
result["topic_recommendation"] = recommendations[idx]
|
||
|
||
return results, status
|
||
|
||
except Exception as e:
|
||
logger.error("智能选题生成失败: %s", e)
|
||
return [], f"❌ 智能选题生成失败: {e}" |