import gradio as gr import requests import json import base64 import io import os import time import re import shutil import platform import subprocess from PIL import Image # ================= 0. 基础配置与工具 ================= # 强制不走代理连接本地 SD os.environ['NO_PROXY'] = '127.0.0.1,localhost' CONFIG_FILE = "config.json" OUTPUT_DIR = "xhs_workspace" os.makedirs(OUTPUT_DIR, exist_ok=True) class ConfigManager: @staticmethod def load(): if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: pass return { "api_key": "", "base_url": "https://api.openai.com/v1", "sd_url": "http://127.0.0.1:7860", "model": "gpt-3.5-turbo" } @staticmethod def save(config_data): with open(CONFIG_FILE, 'w', encoding='utf-8') as f: json.dump(config_data, f, indent=4, ensure_ascii=False) # ================= 1. 核心逻辑功能 ================= def get_llm_models(api_key, base_url): if not api_key or not base_url: return gr.update(choices=[]), "⚠️ 请先填写配置" try: url = f"{base_url.rstrip('/')}/models" headers = {"Authorization": f"Bearer {api_key}"} response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() models = [item['id'] for item in data.get('data', [])] # 保存配置 cfg = ConfigManager.load() cfg['api_key'] = api_key cfg['base_url'] = base_url ConfigManager.save(cfg) # 修复警告:允许自定义值 return gr.update(choices=models, value=models[0] if models else None), f"✅ 已连接,加载 {len(models)} 个模型" return gr.update(), f"❌ 连接失败: {response.status_code}" except Exception as e: return gr.update(), f"❌ 错误: {e}" def generate_copy(api_key, base_url, model, topic, style): if not api_key: return "", "", "", "❌ 缺 API Key" # --- 核心修改:优化了 Prompt,增加字数和违禁词限制 --- system_prompt = """ 你是一个小红书爆款内容专家。请根据用户主题生成内容。 【标题规则】(严格执行): 1. 长度限制:必须控制在 18 字以内(含Emoji),绝对不能超过 20 字! 2. 格式要求:Emoji + 爆点关键词 + 核心痛点。 3. 禁忌:禁止使用“第一”、“最”、“顶级”等绝对化广告法违禁词。 4. 风格:二极管标题(震惊/后悔/必看/避雷/哭了),具有强烈的点击欲望。 【正文规则】: 1. 口语化,多用Emoji,分段清晰,不堆砌长句。 2. 结尾必须有 5 个以上相关话题标签(#)。 【绘图 Prompt】: 生成对应的 Stable Diffusion 英文提示词,强调:masterpiece, best quality, 8k, soft lighting, ins style。 返回 JSON 格式: {"title": "...", "content": "...", "sd_prompt": "..."} """ try: headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"主题:{topic}\n风格:{style}"} ], "response_format": {"type": "json_object"} } resp = requests.post(f"{base_url.rstrip('/')}/chat/completions", headers=headers, json=payload, timeout=60) content = resp.json()['choices'][0]['message']['content'] content = re.sub(r'```json\s*|```', '', content).strip() data = json.loads(content) # --- 双重保险:Python 强制截断 --- title = data.get('title', '') # 如果 LLM 不听话超过了20字,强制截断并保留前19个字+省略号,或者直接保留前20个 if len(title) > 20: title = title[:20] return title, data.get('content', ''), data.get('sd_prompt', ''), "✅ 文案生成完毕" except Exception as e: return "", "", "", f"❌ 生成失败: {e}" def get_sd_models(sd_url): try: resp = requests.get(f"{sd_url}/sdapi/v1/sd-models", timeout=3) if resp.status_code == 200: models = [m['title'] for m in resp.json()] return gr.update(choices=models, value=models[0] if models else None), "✅ SD 已连接" return gr.update(choices=[]), "❌ SD 连接失败" except: return gr.update(choices=[]), "❌ SD 未启动或端口错误" def generate_images(sd_url, prompt, neg_prompt, model, steps, cfg): if not model: return None, "❌ 未选择模型" # 切换模型 try: requests.post(f"{sd_url}/sdapi/v1/options", json={"sd_model_checkpoint": model}) except: pass # 忽略切换错误,继续尝试生成 payload = { "prompt": prompt, "negative_prompt": neg_prompt, "steps": steps, "cfg_scale": cfg, "width": 768, "height": 1024, "batch_size": 2 } try: resp = requests.post(f"{sd_url}/sdapi/v1/txt2img", json=payload, timeout=120) images = [] for i in resp.json()['images']: img = Image.open(io.BytesIO(base64.b64decode(i))) images.append(img) return images, "✅ 图片生成完毕" except Exception as e: return None, f"❌ 绘图失败: {e}" def one_click_export(title, content, images): if not title: return "❌ 无法导出:没有标题" safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] folder_name = f"{int(time.time())}_{safe_title}" folder_path = os.path.join(OUTPUT_DIR, folder_name) os.makedirs(folder_path, exist_ok=True) with open(os.path.join(folder_path, "文案.txt"), "w", encoding="utf-8") as f: f.write(f"{title}\n\n{content}") if images: for idx, img in enumerate(images): img.save(os.path.join(folder_path, f"图{idx+1}.png")) try: if platform.system() == "Windows": os.startfile(folder_path) elif platform.system() == "Darwin": subprocess.call(["open", folder_path]) else: subprocess.call(["xdg-open", folder_path]) return f"✅ 已导出至: {folder_path}" except: return f"✅ 已导出: {folder_path}" # ================= 2. UI 界面构建 ================= cfg = ConfigManager.load() with gr.Blocks(title="小红书全自动工作台", theme=gr.themes.Soft()) as app: gr.Markdown("## 🍒 小红书 AI 爆文生产工坊") state_images = gr.State([]) with gr.Row(): with gr.Column(scale=1): with gr.Accordion("⚙️ 系统设置 (自动保存)", open=True): api_key = gr.Textbox(label="LLM API Key", value=cfg['api_key'], type="password") base_url = gr.Textbox(label="Base URL", value=cfg['base_url']) sd_url = gr.Textbox(label="SD URL", value=cfg['sd_url']) with gr.Row(): btn_connect = gr.Button("🔗 连接并获取模型", size="sm") btn_refresh_sd = gr.Button("🔄 刷新 SD", size="sm") # 修复点 1:允许自定义值,防止报错 llm_model = gr.Dropdown(label="选择 LLM 模型", value=cfg['model'], allow_custom_value=True, interactive=True) sd_model = gr.Dropdown(label="选择 SD 模型", allow_custom_value=True, interactive=True) status_bar = gr.Markdown("等待就绪...") gr.Markdown("### 💡 内容构思") topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭") style = gr.Dropdown(["好物种草", "干货教程", "情绪共鸣", "生活Vlog"], label="风格", value="好物种草") btn_step1 = gr.Button("✨ 第一步:生成文案方案", variant="primary") with gr.Column(scale=1): gr.Markdown("### 📝 文案确认") # 修复点 2:去掉了 show_copy_button 参数,兼容旧版 Gradio res_title = gr.Textbox(label="标题 (AI生成)", interactive=True) res_content = gr.TextArea(label="正文 (AI生成)", lines=10, interactive=True) res_prompt = gr.TextArea(label="绘图提示词", lines=4, interactive=True) with gr.Accordion("🎨 绘图参数", open=False): neg_prompt = gr.Textbox(label="反向词", value="nsfw, lowres, bad anatomy, text, error") steps = gr.Slider(15, 50, value=25, label="步数") cfg_scale = gr.Slider(1, 15, value=7, label="相关性 (CFG)") btn_step2 = gr.Button("🎨 第二步:开始绘图", variant="primary") with gr.Column(scale=1): gr.Markdown("### 🖼️ 视觉结果") gallery = gr.Gallery(label="生成预览", columns=1, height="auto") btn_export = gr.Button("📂 一键导出 (文案+图片)", variant="stop") export_msg = gr.Markdown("") # ================= 3. 事件绑定 ================= btn_connect.click(fn=get_llm_models, inputs=[api_key, base_url], outputs=[llm_model, status_bar]) btn_refresh_sd.click(fn=get_sd_models, inputs=[sd_url], outputs=[sd_model, status_bar]) btn_step1.click( fn=generate_copy, inputs=[api_key, base_url, llm_model, topic, style], outputs=[res_title, res_content, res_prompt, status_bar] ) def on_img_gen(sd_url, p, np, m, s, c): imgs, msg = generate_images(sd_url, p, np, m, s, c) return imgs, imgs, msg btn_step2.click( fn=on_img_gen, inputs=[sd_url, res_prompt, neg_prompt, sd_model, steps, cfg_scale], outputs=[gallery, state_images, status_bar] ) btn_export.click( fn=one_click_export, inputs=[res_title, res_content, state_images], outputs=[export_msg] ) app.load(fn=get_sd_models, inputs=[sd_url], outputs=[sd_model, status_bar]) if __name__ == "__main__": app.launch(inbrowser=True)