""" services/content.py 文案生成、图片生成、一键导出、发布到小红书 """ import os import re import time import platform import subprocess import logging from PIL import Image from .config_manager import ConfigManager, OUTPUT_DIR from .llm_service import LLMService from .sd_service import SDService, get_sd_preset from .mcp_client import get_mcp_client from .connection import _get_llm_config from .persona import _resolve_persona logger = logging.getLogger("autobot") cfg = ConfigManager() def generate_copy(model, topic, style, sd_model_name, persona_text): """生成文案(自动适配 SD 模型,支持人设,自动注入权重数据)""" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" try: svc = LLMService(api_key, base_url, model) persona = _resolve_persona(persona_text) if persona_text else None # 尝试自动注入权重数据(数据闭环 9.1) data = None try: from .analytics_service import AnalyticsService analytics = AnalyticsService() if analytics.has_weights: weight_insights = analytics.weights_summary title_advice = analytics.get_title_advice() hot_tags = ", ".join(analytics.get_top_tags(8)) data = svc.generate_weighted_copy( topic, style, weight_insights=weight_insights, title_advice=title_advice, hot_tags=hot_tags, sd_model_name=sd_model_name, persona=persona, ) logger.info("使用加权文案生成路径(权重数据已注入)") except Exception as e: logger.debug("权重数据注入跳过: %s", e) # 无权重或权重路径失败时,退回基础生成 if data is None: data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) cfg.set("model", model) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags) if tags else "", "✅ 文案生成完毕", ) except Exception as e: logger.error("文案生成失败: %s", e) return "", "", "", "", f"❌ 生成失败: {e}" def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale, face_swap_on, face_img, quality_mode, persona_text=None, enhance_level: float = 1.0): """生成图片(可选 ReActor 换脸,支持质量模式预设,支持人设视觉优化,支持美化增强)""" if not model: return None, [], "❌ 未选择 SD 模型" try: svc = SDService(sd_url) # 判断是否启用换脸 face_image = None if face_swap_on: # Gradio 可能传 PIL.Image / numpy.ndarray / 文件路径 / None if face_img is not None: if isinstance(face_img, Image.Image): face_image = face_img elif isinstance(face_img, str) and os.path.isfile(face_img): face_image = Image.open(face_img).convert("RGB") else: # numpy array 等其他格式 try: import numpy as np if isinstance(face_img, np.ndarray): face_image = Image.fromarray(face_img).convert("RGB") logger.info("头像从 numpy array 转换为 PIL Image") except Exception as e: logger.warning("头像格式转换失败 (%s): %s", type(face_img).__name__, e) # 如果 UI 没传有效头像,从本地文件加载 if face_image is None: face_image = SDService.load_face_image() if face_image is not None: logger.info("换脸头像已就绪: %dx%d", face_image.width, face_image.height) else: logger.warning("换脸已启用但未找到有效头像") persona = _resolve_persona(persona_text) if persona_text else None images = svc.txt2img( prompt=prompt, negative_prompt=neg_prompt, model=model, steps=int(steps), cfg_scale=float(cfg_scale), face_image=face_image, quality_mode=quality_mode, persona=persona, enhance_level=float(enhance_level), ) preset = get_sd_preset(quality_mode) swap_hint = " (已换脸)" if face_image else "" return images, images, f"✅ 生成 {len(images)} 张图片{swap_hint} [{quality_mode}]" except Exception as e: logger.error("图片生成失败: %s", e) return None, [], f"❌ 绘图失败: {e}" def one_click_export(title, content, images): """导出文案和图片到本地""" if not title: return "❌ 无法导出:没有标题" safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] folder_name = f"{int(time.time())}_{safe_title}" folder_path = os.path.join(OUTPUT_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"{title}\n\n{content}") saved_paths = [] if images: for idx, img in enumerate(images): path = os.path.join(folder_path, f"图{idx+1}.jpg") if isinstance(img, Image.Image): if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) saved_paths.append(os.path.abspath(path)) # 尝试打开文件夹 try: abs_path = os.path.abspath(folder_path) if platform.system() == "Windows": os.startfile(abs_path) elif platform.system() == "Darwin": subprocess.call(["open", abs_path]) else: subprocess.call(["xdg-open", abs_path]) except Exception: pass return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)" def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time): """通过 MCP 发布到小红书(含输入校验和临时文件自动清理)""" # === 发布前校验 === if not title: return "❌ 缺少标题" if len(title) > 20: return f"❌ 标题超长:当前 {len(title)} 字,小红书限制 ≤20 字,请精简后再发布" client = get_mcp_client(mcp_url) ai_temp_files: list = [] # 追踪本次写入的临时文件,用于 finally 清理 try: # 收集图片路径 image_paths = [] # 先保存 AI 生成的图片到临时目录 if images: temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish") os.makedirs(temp_dir, exist_ok=True) for idx, img in enumerate(images): if isinstance(img, Image.Image): path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.jpg")) if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) image_paths.append(path) ai_temp_files.append(path) # 登记临时文件 # 添加本地上传的图片 if local_images: for img_file in local_images: img_path = img_file.name if hasattr(img_file, 'name') else str(img_file) if os.path.exists(img_path): image_paths.append(os.path.abspath(img_path)) # === 图片校验 === if not image_paths: return "❌ 至少需要 1 张图片才能发布" if len(image_paths) > 18: return f"❌ 图片数量超限:当前 {len(image_paths)} 张,小红书限制 ≤18 张,请减少图片" for p in image_paths: if not os.path.exists(p): return f"❌ 图片文件不存在:{p}" # 解析标签 tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None # 定时发布 schedule = schedule_time if schedule_time and schedule_time.strip() else None result = client.publish_content( title=title, content=content, images=image_paths, tags=tags, schedule_at=schedule, ) if "error" in result: return f"❌ 发布失败: {result['error']}" return f"✅ 发布成功!\n{result.get('text', '')}" except Exception as e: logger.error("发布失败: %s", e) return f"❌ 发布异常: {e}" finally: # 清理本次写入的 AI 临时图片(无论成功/失败) for tmp_path in ai_temp_files: try: if os.path.exists(tmp_path): os.remove(tmp_path) except OSError as cleanup_err: logger.warning("临时文件清理失败 %s: %s", tmp_path, cleanup_err) # ========== 批量创作 ========== def batch_generate_copy( model: str, topics: list[str], style: str, sd_model_name: str = "", persona_text: str = "", template_name: str = "", publish_queue=None, ) -> tuple[list[dict], str]: """ 批量生成多篇文案(串行),自动插入发布队列草稿 Args: model: LLM 模型名 topics: 主题列表 (最多 10 个) style: 写作风格 sd_model_name: SD 模型名 persona_text: 人设文本 template_name: 可选的模板名 publish_queue: 可选的 PublishQueue 实例 Returns: (results_list, status_msg) """ if not topics: return [], "❌ 请输入至少一个主题" if len(topics) > 10: return [], "❌ 批量生成最多支持 10 个主题,请减少数量" api_key, base_url, _ = _get_llm_config() if not api_key: return [], "❌ 请先配置并连接 LLM 提供商" # 加载模板覆盖 prompt_override = "" tags_preset = [] if template_name: try: from .content_template import ContentTemplate ct = ContentTemplate() override = ct.apply_template(template_name) style = override.get("style") or style prompt_override = override.get("prompt_override", "") tags_preset = override.get("tags_preset", []) except Exception as e: logger.warning("模板加载失败,使用默认参数: %s", e) svc = LLMService(api_key, base_url, model) persona = _resolve_persona(persona_text) if persona_text else None results = [] success_count = 0 fail_count = 0 for idx, topic in enumerate(topics): topic = topic.strip() if not topic: continue try: data = svc.generate_copy( topic, style, sd_model_name=sd_model_name, persona=persona, ) # 如有模板 prompt_override,它已通过风格参数间接生效 # 合并模板标签 tags = data.get("tags", []) if tags_preset: existing = set(tags) for t in tags_preset: if t not in existing: tags.append(t) data["tags"] = tags data["batch_index"] = idx results.append(data) success_count += 1 # 自动入队为草稿 if publish_queue: try: publish_queue.add( title=data.get("title", ""), content=data.get("content", ""), sd_prompt=data.get("sd_prompt", ""), tags=data.get("tags", []), topic=topic, style=style, persona=persona_text if persona_text else "", status="draft", ) except Exception as e: logger.warning("批量草稿入队失败 #%d: %s", idx, e) logger.info("批量生成 %d/%d 完成: %s", idx + 1, len(topics), topic[:20]) except Exception as e: logger.error("批量生成 %d/%d 失败 [%s]: %s", idx + 1, len(topics), topic[:20], e) results.append({ "batch_index": idx, "topic": topic, "error": str(e), }) fail_count += 1 status = f"✅ 批量生成完成: {success_count} 成功" if fail_count: status += f", {fail_count} 失败" if publish_queue and success_count: status += f" | {success_count} 篇已入草稿队列" return results, status def generate_copy_with_topic_engine( model: str, style: str, sd_model_name: str = "", persona_text: str = "", count: int = 1, hotspot_data: dict = None, publish_queue=None, ) -> tuple[list[dict], str]: """ 使用智能选题引擎自动选题 + 生成文案 Args: model: LLM 模型名 style: 写作风格 sd_model_name: SD 模型名 persona_text: 人设文本 count: 生成篇数 hotspot_data: 可选的热点分析数据 publish_queue: 可选的 PublishQueue 实例 Returns: (results_list, status_msg) """ try: from .analytics_service import AnalyticsService from .topic_engine import TopicEngine analytics = AnalyticsService() engine = TopicEngine(analytics) recommendations = engine.recommend_topics(count=count, hotspot_data=hotspot_data) if not recommendations: return [], "❌ 选题引擎未找到推荐主题,请先进行热点搜索或积累数据" topics = [r["topic"] for r in recommendations] results, status = batch_generate_copy( model=model, topics=topics, style=style, sd_model_name=sd_model_name, persona_text=persona_text, publish_queue=publish_queue, ) # 把选题推荐信息附加到结果 for result in results: idx = result.get("batch_index", -1) if 0 <= idx < len(recommendations): result["topic_recommendation"] = recommendations[idx] return results, status except Exception as e: logger.error("智能选题生成失败: %s", e) return [], f"❌ 智能选题生成失败: {e}"