""" services/content.py 文案生成、图片生成、一键导出、发布到小红书 """ import os import re import time import platform import subprocess import logging from PIL import Image from config_manager import ConfigManager, OUTPUT_DIR from llm_service import LLMService from sd_service import SDService, get_sd_preset from mcp_client import get_mcp_client from services.connection import _get_llm_config from services.persona import _resolve_persona logger = logging.getLogger("autobot") cfg = ConfigManager() def generate_copy(model, topic, style, sd_model_name, persona_text): """生成文案(自动适配 SD 模型的 prompt 风格,支持人设)""" api_key, base_url, _ = _get_llm_config() if not api_key: return "", "", "", "", "❌ 请先配置并连接 LLM 提供商" try: svc = LLMService(api_key, base_url, model) persona = _resolve_persona(persona_text) if persona_text else None data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) cfg.set("model", model) tags = data.get("tags", []) return ( data.get("title", ""), data.get("content", ""), data.get("sd_prompt", ""), ", ".join(tags) if tags else "", "✅ 文案生成完毕", ) except Exception as e: logger.error("文案生成失败: %s", e) return "", "", "", "", f"❌ 生成失败: {e}" def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg_scale, face_swap_on, face_img, quality_mode, persona_text=None): """生成图片(可选 ReActor 换脸,支持质量模式预设,支持人设视觉优化)""" if not model: return None, [], "❌ 未选择 SD 模型" try: svc = SDService(sd_url) # 判断是否启用换脸 face_image = None if face_swap_on: # Gradio 可能传 PIL.Image / numpy.ndarray / 文件路径 / None if face_img is not None: if isinstance(face_img, Image.Image): face_image = face_img elif isinstance(face_img, str) and os.path.isfile(face_img): face_image = Image.open(face_img).convert("RGB") else: # numpy array 等其他格式 try: import numpy as np if isinstance(face_img, np.ndarray): face_image = Image.fromarray(face_img).convert("RGB") logger.info("头像从 numpy array 转换为 PIL Image") except Exception as e: logger.warning("头像格式转换失败 (%s): %s", type(face_img).__name__, e) # 如果 UI 没传有效头像,从本地文件加载 if face_image is None: face_image = SDService.load_face_image() if face_image is not None: logger.info("换脸头像已就绪: %dx%d", face_image.width, face_image.height) else: logger.warning("换脸已启用但未找到有效头像") persona = _resolve_persona(persona_text) if persona_text else None images = svc.txt2img( prompt=prompt, negative_prompt=neg_prompt, model=model, steps=int(steps), cfg_scale=float(cfg_scale), face_image=face_image, quality_mode=quality_mode, persona=persona, ) preset = get_sd_preset(quality_mode) swap_hint = " (已换脸)" if face_image else "" return images, images, f"✅ 生成 {len(images)} 张图片{swap_hint} [{quality_mode}]" except Exception as e: logger.error("图片生成失败: %s", e) return None, [], f"❌ 绘图失败: {e}" def one_click_export(title, content, images): """导出文案和图片到本地""" if not title: return "❌ 无法导出:没有标题" safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] folder_name = f"{int(time.time())}_{safe_title}" folder_path = os.path.join(OUTPUT_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"{title}\n\n{content}") saved_paths = [] if images: for idx, img in enumerate(images): path = os.path.join(folder_path, f"图{idx+1}.jpg") if isinstance(img, Image.Image): if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) saved_paths.append(os.path.abspath(path)) # 尝试打开文件夹 try: abs_path = os.path.abspath(folder_path) if platform.system() == "Windows": os.startfile(abs_path) elif platform.system() == "Darwin": subprocess.call(["open", abs_path]) else: subprocess.call(["xdg-open", abs_path]) except Exception: pass return f"✅ 已导出至: {folder_path} ({len(saved_paths)} 张图片)" def publish_to_xhs(title, content, tags_str, images, local_images, mcp_url, schedule_time): """通过 MCP 发布到小红书(含输入校验和临时文件自动清理)""" # === 发布前校验 === if not title: return "❌ 缺少标题" if len(title) > 20: return f"❌ 标题超长:当前 {len(title)} 字,小红书限制 ≤20 字,请精简后再发布" client = get_mcp_client(mcp_url) ai_temp_files: list = [] # 追踪本次写入的临时文件,用于 finally 清理 try: # 收集图片路径 image_paths = [] # 先保存 AI 生成的图片到临时目录 if images: temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish") os.makedirs(temp_dir, exist_ok=True) for idx, img in enumerate(images): if isinstance(img, Image.Image): path = os.path.abspath(os.path.join(temp_dir, f"ai_{idx}.jpg")) if img.mode != "RGB": img = img.convert("RGB") img.save(path, format="JPEG", quality=95) image_paths.append(path) ai_temp_files.append(path) # 登记临时文件 # 添加本地上传的图片 if local_images: for img_file in local_images: img_path = img_file.name if hasattr(img_file, 'name') else str(img_file) if os.path.exists(img_path): image_paths.append(os.path.abspath(img_path)) # === 图片校验 === if not image_paths: return "❌ 至少需要 1 张图片才能发布" if len(image_paths) > 18: return f"❌ 图片数量超限:当前 {len(image_paths)} 张,小红书限制 ≤18 张,请减少图片" for p in image_paths: if not os.path.exists(p): return f"❌ 图片文件不存在:{p}" # 解析标签 tags = [t.strip().lstrip("#") for t in tags_str.split(",") if t.strip()] if tags_str else None # 定时发布 schedule = schedule_time if schedule_time and schedule_time.strip() else None result = client.publish_content( title=title, content=content, images=image_paths, tags=tags, schedule_at=schedule, ) if "error" in result: return f"❌ 发布失败: {result['error']}" return f"✅ 发布成功!\n{result.get('text', '')}" except Exception as e: logger.error("发布失败: %s", e) return f"❌ 发布异常: {e}" finally: # 清理本次写入的 AI 临时图片(无论成功/失败) for tmp_path in ai_temp_files: try: if os.path.exists(tmp_path): os.remove(tmp_path) except OSError as cleanup_err: logger.warning("临时文件清理失败 %s: %s", tmp_path, cleanup_err)