From 5ee26cb78257ab94b5cfede0a6413f06c3a4d443 Mon Sep 17 00:00:00 2001 From: zhoujie <929834232@qq.com> Date: Tue, 10 Feb 2026 21:57:26 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(publish):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E5=8F=91=E5=B8=83=E9=98=9F=E5=88=97=E4=B8=8E?= =?UTF-8?q?=E6=8E=92=E6=9C=9F=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增发布队列模块 `publish_queue.py`,实现基于 SQLite 的持久化队列管理 - 支持内容草稿、审核、排期、自动发布、失败重试等全流程状态管理 - 新增批量生成到队列功能,支持智能权重选择和主题池随机选取 - 新增队列后台处理器,支持定时检查和自动发布已排期内容 - 新增内容排期日历视图,可视化展示近期的发布计划 - 新增队列管理界面,包含状态筛选、单项操作和批量审核功能 - 新增发布回调机制,将队列项数据发布到小红书平台 - 新增配置项 `use_smart_weights` 控制是否启用智能权重生成 - 更新主界面,新增第八个标签页“内容排期”,集成所有队列相关功能 --- config.json | 3 +- main.py | 477 ++++++++++++++++++++++++++++++++++++++ publish_queue.py | 583 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1062 insertions(+), 1 deletion(-) create mode 100644 publish_queue.py diff --git a/config.json b/config.json index 0f981bb..e158f48 100644 --- a/config.json +++ b/config.json @@ -21,5 +21,6 @@ "base_url": "https://wolfai.top/v1" } ], - "xsec_token": "ABfkw0sdbz9Lf-js1d83biryHO6o13nCCPwPbVK6eGYR8=" + "use_smart_weights": true, + "xsec_token": "AB1StlX7ffxsEkfyNuTFDesPlV2g1haPcYuh1-AkYcQxo=" } \ No newline at end of file diff --git a/main.py b/main.py index 7e919e7..373e4d9 100644 --- a/main.py +++ b/main.py @@ -57,6 +57,16 @@ cfg.ensure_workspace() mcp = get_mcp_client(cfg.get("mcp_url", "http://localhost:18060/mcp")) analytics = AnalyticsService(OUTPUT_DIR) +# ================= 发布队列 ================= +from publish_queue import ( + PublishQueue, QueuePublisher, + STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED, STATUS_PUBLISHING, + STATUS_PUBLISHED, STATUS_FAILED, STATUS_REJECTED, STATUS_LABELS, +) + +pub_queue = PublishQueue(OUTPUT_DIR) +queue_publisher = QueuePublisher(pub_queue) + # ================================================== # LLM 多提供商管理 # ================================================== @@ -2130,6 +2140,301 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, per return f"❌ 发布失败: {e}" +# ================================================== +# 发布队列相关函数 +# ================================================== + +def generate_to_queue(topics_str, sd_url_val, sd_model_name, model, persona_text=None, + quality_mode_val=None, face_swap_on=False, count=1, + scheduled_time=None): + """批量生成内容 → 加入发布队列(不直接发布)""" + try: + topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS + use_weights = cfg.get("use_smart_weights", True) and analytics.has_weights + + api_key, base_url, _ = _get_llm_config() + if not api_key: + return "❌ LLM 未配置" + if not sd_url_val or not sd_model_name: + return "❌ SD WebUI 未连接或未选择模型" + + count = max(1, min(int(count), 10)) + results = [] + + for i in range(count): + try: + _auto_log_append(f"📋 [队列生成] 正在生成第 {i+1}/{count} 篇...") + + if use_weights: + topic = analytics.get_weighted_topic(topics) + style = analytics.get_weighted_style(DEFAULT_STYLES) + else: + topic = random.choice(topics) + style = random.choice(DEFAULT_STYLES) + + svc = LLMService(api_key, base_url, model) + persona = _resolve_persona(persona_text) if persona_text else None + + if use_weights: + weight_insights = f"高权重主题: {', '.join(list(analytics._weights.get('topic_weights', {}).keys())[:5])}\n" + weight_insights += f"权重摘要: {analytics.weights_summary}" + title_advice = analytics.get_title_advice() + hot_tags = ", ".join(analytics.get_top_tags(8)) + try: + data = svc.generate_weighted_copy(topic, style, weight_insights, title_advice, hot_tags, sd_model_name=sd_model_name, persona=persona) + except Exception: + data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) + else: + data = svc.generate_copy(topic, style, sd_model_name=sd_model_name, persona=persona) + + title = (data.get("title", "") or "")[:20] + content = data.get("content", "") + sd_prompt = data.get("sd_prompt", "") + tags = data.get("tags", []) + + if use_weights: + top_tags = analytics.get_top_tags(5) + for t in top_tags: + if t not in tags: + tags.append(t) + tags = tags[:10] + + if not title: + _auto_log_append(f"⚠️ 第 {i+1} 篇文案生成失败,跳过") + continue + + # 生成图片 + sd_svc = SDService(sd_url_val) + face_image = None + if face_swap_on: + face_image = SDService.load_face_image() + images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name, + face_image=face_image, + quality_mode=quality_mode_val or "快速 (约30秒)") + if not images: + _auto_log_append(f"⚠️ 第 {i+1} 篇图片生成失败,跳过") + continue + + # 保存备份 + ts = int(time.time()) + safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20] + backup_dir = os.path.join(OUTPUT_DIR, f"{ts}_{safe_title}") + os.makedirs(backup_dir, exist_ok=True) + + with open(os.path.join(backup_dir, "文案.txt"), "w", encoding="utf-8") as f: + f.write(f"标题: {title}\n风格: {style}\n主题: {topic}\n\n{content}\n\n标签: {', '.join(tags)}\n\nSD Prompt: {sd_prompt}") + + image_paths = [] + for idx, img in enumerate(images): + if isinstance(img, Image.Image): + path = os.path.abspath(os.path.join(backup_dir, f"图{idx+1}.jpg")) + if img.mode != "RGB": + img = img.convert("RGB") + img.save(path, format="JPEG", quality=95) + image_paths.append(path) + + if not image_paths: + continue + + # 加入队列 + item_id = pub_queue.add( + title=title, content=content, sd_prompt=sd_prompt, + tags=tags, image_paths=image_paths, backup_dir=backup_dir, + topic=topic, style=style, persona=persona or "", + status=STATUS_DRAFT, scheduled_time=scheduled_time, + ) + results.append(f"#{item_id} {title}") + _auto_log_append(f"📋 已加入队列 #{item_id}: {title}") + + # 多篇间隔 + if i < count - 1: + time.sleep(2) + + except Exception as e: + _auto_log_append(f"⚠️ 第 {i+1} 篇生成异常: {e}") + continue + + if not results: + return "❌ 所有内容生成失败,请检查配置" + + return f"✅ 已生成 {len(results)} 篇内容加入队列:\n" + "\n".join(f" - {r}" for r in results) + + except Exception as e: + return f"❌ 批量生成异常: {e}" + + +def _queue_publish_callback(item: dict) -> tuple[bool, str]: + """队列发布回调: 从队列项数据发布到小红书""" + try: + mcp_url = cfg.get("mcp_url", "http://localhost:18060/mcp") + client = get_mcp_client(mcp_url) + title = item.get("title", "") + content = item.get("content", "") + image_paths = item.get("image_paths", []) + tags = item.get("tags", []) + + if not title or not image_paths: + return False, "标题或图片缺失" + + # 验证图片文件存在 + valid_paths = [p for p in image_paths if os.path.isfile(p)] + if not valid_paths: + return False, "所有图片文件不存在" + + result = client.publish_content( + title=title, content=content, images=valid_paths, tags=tags, + ) + if "error" in result: + return False, result["error"] + + _increment_stat("publishes") + _clear_error_streak() + return True, result.get("text", "发布成功") + + except Exception as e: + return False, str(e) + + +# 注册发布回调 +queue_publisher.set_publish_callback(_queue_publish_callback) +queue_publisher.set_log_callback(_auto_log_append) + + +def queue_refresh_table(status_filter): + """刷新队列表格""" + statuses = None + if status_filter and status_filter != "全部": + status_map = {v: k for k, v in STATUS_LABELS.items()} + if status_filter in status_map: + statuses = [status_map[status_filter]] + return pub_queue.format_queue_table(statuses) + + +def queue_refresh_calendar(): + """刷新日历视图""" + return pub_queue.format_calendar(14) + + +def queue_preview_item(item_id_str): + """预览队列项""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + return pub_queue.format_preview(item_id) + except (ValueError, TypeError): + return "❌ 请输入有效的队列项 ID(数字)" + + +def queue_approve_item(item_id_str, scheduled_time_str): + """审核通过""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + sched = scheduled_time_str.strip() if scheduled_time_str else None + ok = pub_queue.approve(item_id, scheduled_time=sched) + if ok: + status = "已排期" if sched else "待发布" + return f"✅ #{item_id} 已审核通过 → {status}" + return f"❌ #{item_id} 无法审核(可能不是草稿/失败状态)" + except (ValueError, TypeError): + return "❌ 请输入有效的 ID" + + +def queue_reject_item(item_id_str): + """拒绝队列项""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + ok = pub_queue.reject(item_id) + return f"✅ #{item_id} 已拒绝" if ok else f"❌ #{item_id} 无法拒绝" + except (ValueError, TypeError): + return "❌ 请输入有效的 ID" + + +def queue_delete_item(item_id_str): + """删除队列项""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + ok = pub_queue.delete(item_id) + return f"✅ #{item_id} 已删除" if ok else f"❌ #{item_id} 无法删除(可能正在发布中)" + except (ValueError, TypeError): + return "❌ 请输入有效的 ID" + + +def queue_retry_item(item_id_str): + """重试失败项""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + ok = pub_queue.retry(item_id) + return f"✅ #{item_id} 已重新加入待发布" if ok else f"❌ #{item_id} 无法重试(不是失败状态)" + except (ValueError, TypeError): + return "❌ 请输入有效的 ID" + + +def queue_publish_now(item_id_str): + """立即发布队列项""" + try: + item_id = int(str(item_id_str).strip().replace("#", "")) + return queue_publisher.publish_now(item_id) + except (ValueError, TypeError): + return "❌ 请输入有效的 ID" + + +def queue_start_processor(): + """启动队列后台处理器""" + if queue_publisher.is_running: + return "⚠️ 队列处理器已在运行中" + queue_publisher.start(check_interval=60) + return "✅ 队列处理器已启动,每分钟检查待发布项" + + +def queue_stop_processor(): + """停止队列后台处理器""" + if not queue_publisher.is_running: + return "⚠️ 队列处理器未在运行" + queue_publisher.stop() + return "🛑 队列处理器已停止" + + +def queue_get_status(): + """获取队列状态摘要""" + counts = pub_queue.count_by_status() + running = "🟢 运行中" if queue_publisher.is_running else "⚪ 未启动" + parts = [f"**队列处理器**: {running}"] + for s, label in STATUS_LABELS.items(): + cnt = counts.get(s, 0) + if cnt > 0: + parts.append(f"{label}: {cnt}") + total = sum(counts.values()) + parts.append(f"**合计**: {total} 项") + return " · ".join(parts) + + +def queue_batch_approve(status_filter): + """批量审核通过所有草稿""" + items = pub_queue.list_by_status([STATUS_DRAFT]) + if not items: + return "📭 没有待审核的草稿" + approved = 0 + for item in items: + if pub_queue.approve(item["id"]): + approved += 1 + return f"✅ 已批量审核通过 {approved} 项" + + +def queue_generate_and_refresh(topics_str, sd_url_val, sd_model_name, model, + persona_text, quality_mode_val, face_swap_on, + gen_count, gen_schedule_time): + """生成内容到队列 + 刷新表格""" + msg = generate_to_queue( + topics_str, sd_url_val, sd_model_name, model, + persona_text=persona_text, quality_mode_val=quality_mode_val, + face_swap_on=face_swap_on, count=gen_count, + scheduled_time=gen_schedule_time.strip() if gen_schedule_time else None, + ) + table = pub_queue.format_queue_table() + calendar = pub_queue.format_calendar(14) + status = queue_get_status() + return msg, table, calendar, status + + # 调度器下次执行时间追踪 _scheduler_next_times = {} @@ -3409,6 +3714,100 @@ with gr.Blocks( value=_get_stats_summary(), ) + # -------- Tab 8: 内容排期 📅 -------- + with gr.Tab("📅 内容排期"): + gr.Markdown( + "### 📅 内容排期日历 + 发布队列\n" + "> 批量生成内容 → 预览审核 → 排期定时 → 自动发布,内容创作全流程管控\n\n" + "**工作流**: 生成内容 → 📝草稿 → ✅审核通过 → 🕐排期/立即发布 → 🚀自动发布" + ) + + with gr.Row(): + # ===== 左栏: 生成 & 队列控制 ===== + with gr.Column(scale=1): + gr.Markdown("#### 🔧 批量生成到队列") + queue_gen_topics = gr.Textbox( + label="主题池 (逗号分隔,随人设自动切换)", + value=", ".join(get_persona_topics(config.get("persona", ""))), + placeholder="会从池中随机选取,切换人设自动更新", + ) + with gr.Row(): + queue_gen_count = gr.Number( + label="生成数量", value=3, minimum=1, maximum=10, + ) + queue_gen_schedule = gr.Textbox( + label="排期时间 (可选)", + placeholder="如 2026-02-10 18:00:00,留空=仅草稿", + ) + btn_queue_generate = gr.Button( + "📝 批量生成 → 加入队列", variant="primary", size="lg", + ) + queue_gen_result = gr.Markdown("") + + gr.Markdown("---") + gr.Markdown("#### ⚙️ 队列处理器") + queue_processor_status = gr.Markdown( + value=queue_get_status(), + ) + with gr.Row(): + btn_queue_start = gr.Button( + "▶️ 启动队列处理", variant="primary", + ) + btn_queue_stop = gr.Button( + "⏹️ 停止队列处理", variant="stop", + ) + queue_processor_result = gr.Markdown("") + + gr.Markdown("---") + gr.Markdown("#### 🔍 操作单个队列项") + queue_item_id = gr.Textbox( + label="队列项 ID", placeholder="输入 # 号,如 1", + ) + with gr.Row(): + btn_queue_preview = gr.Button("👁️ 预览", size="sm") + btn_queue_approve = gr.Button("✅ 通过", size="sm", variant="primary") + btn_queue_reject = gr.Button("🚫 拒绝", size="sm") + with gr.Row(): + btn_queue_publish_now = gr.Button("🚀 立即发布", size="sm", variant="primary") + btn_queue_retry = gr.Button("🔄 重试", size="sm") + btn_queue_delete = gr.Button("🗑️ 删除", size="sm", variant="stop") + queue_schedule_time = gr.Textbox( + label="排期时间 (审核通过时可指定)", + placeholder="如 2026-02-10 20:00:00,留空=立即待发布", + ) + btn_queue_batch_approve = gr.Button( + "✅ 批量通过所有草稿", variant="secondary", + ) + queue_op_result = gr.Markdown("") + + # ===== 右栏: 队列列表 & 日历 ===== + with gr.Column(scale=2): + gr.Markdown("#### 📋 发布队列") + with gr.Row(): + queue_filter = gr.Dropdown( + label="状态筛选", + choices=["全部"] + list(STATUS_LABELS.values()), + value="全部", + ) + btn_queue_refresh = gr.Button("🔄 刷新", size="sm") + queue_table = gr.Markdown( + value=pub_queue.format_queue_table(), + label="队列列表", + ) + + gr.Markdown("---") + gr.Markdown("#### 📅 排期日历") + queue_calendar = gr.Markdown( + value=pub_queue.format_calendar(14), + label="日历视图", + ) + + gr.Markdown("---") + gr.Markdown("#### 👁️ 内容预览") + queue_preview_display = gr.Markdown( + value="*选择队列项 ID 后点击预览*", + ) + # ================================================== # 事件绑定 # ================================================== @@ -3732,6 +4131,84 @@ with gr.Blocks( outputs=[autostart_status], ) + # ---- Tab 8: 内容排期 ---- + # 人设切换 → 联动队列的主题池 + persona.change( + fn=lambda p: ", ".join(get_persona_topics(p)), + inputs=[persona], + outputs=[queue_gen_topics], + ) + + # 批量生成到队列 + btn_queue_generate.click( + fn=queue_generate_and_refresh, + inputs=[queue_gen_topics, sd_url, sd_model, llm_model, + persona, quality_mode, face_swap_toggle, + queue_gen_count, queue_gen_schedule], + outputs=[queue_gen_result, queue_table, queue_calendar, queue_processor_status], + ) + + # 刷新队列 + btn_queue_refresh.click( + fn=lambda sf: (queue_refresh_table(sf), queue_refresh_calendar(), queue_get_status()), + inputs=[queue_filter], + outputs=[queue_table, queue_calendar, queue_processor_status], + ) + queue_filter.change( + fn=lambda sf: queue_refresh_table(sf), + inputs=[queue_filter], + outputs=[queue_table], + ) + + # 单项操作 + btn_queue_preview.click( + fn=queue_preview_item, + inputs=[queue_item_id], + outputs=[queue_preview_display], + ) + btn_queue_approve.click( + fn=lambda iid, st: (queue_approve_item(iid, st), pub_queue.format_queue_table(), pub_queue.format_calendar(14)), + inputs=[queue_item_id, queue_schedule_time], + outputs=[queue_op_result, queue_table, queue_calendar], + ) + btn_queue_reject.click( + fn=lambda iid: (queue_reject_item(iid), pub_queue.format_queue_table()), + inputs=[queue_item_id], + outputs=[queue_op_result, queue_table], + ) + btn_queue_delete.click( + fn=lambda iid: (queue_delete_item(iid), pub_queue.format_queue_table(), pub_queue.format_calendar(14)), + inputs=[queue_item_id], + outputs=[queue_op_result, queue_table, queue_calendar], + ) + btn_queue_retry.click( + fn=lambda iid: (queue_retry_item(iid), pub_queue.format_queue_table()), + inputs=[queue_item_id], + outputs=[queue_op_result, queue_table], + ) + btn_queue_publish_now.click( + fn=lambda iid: (queue_publish_now(iid), pub_queue.format_queue_table(), pub_queue.format_calendar(14), queue_get_status()), + inputs=[queue_item_id], + outputs=[queue_op_result, queue_table, queue_calendar, queue_processor_status], + ) + btn_queue_batch_approve.click( + fn=lambda sf: (queue_batch_approve(sf), pub_queue.format_queue_table(), pub_queue.format_calendar(14)), + inputs=[queue_filter], + outputs=[queue_op_result, queue_table, queue_calendar], + ) + + # 队列处理器 + btn_queue_start.click( + fn=lambda: (queue_start_processor(), queue_get_status()), + inputs=[], + outputs=[queue_processor_result, queue_processor_status], + ) + btn_queue_stop.click( + fn=lambda: (queue_stop_processor(), queue_get_status()), + inputs=[], + outputs=[queue_processor_result, queue_processor_status], + ) + # ---- 启动时自动刷新 SD ---- app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar]) diff --git a/publish_queue.py b/publish_queue.py new file mode 100644 index 0000000..36b4e96 --- /dev/null +++ b/publish_queue.py @@ -0,0 +1,583 @@ +""" +发布队列模块 +SQLite 持久化的内容排期 + 发布队列,支持草稿预审、定时发布、失败重试 +""" +import sqlite3 +import json +import os +import time +import logging +import threading +from datetime import datetime, timedelta +from typing import Optional + +logger = logging.getLogger(__name__) + +# 队列项状态 +STATUS_DRAFT = "draft" # 草稿 — 待审核 +STATUS_APPROVED = "approved" # 已审核 — 待排期或立即可发布 +STATUS_SCHEDULED = "scheduled" # 已排期 — 定时发布 +STATUS_PUBLISHING = "publishing" # 发布中 +STATUS_PUBLISHED = "published" # 已发布 +STATUS_FAILED = "failed" # 发布失败 +STATUS_REJECTED = "rejected" # 已拒绝/丢弃 + +ALL_STATUSES = [STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED, + STATUS_PUBLISHING, STATUS_PUBLISHED, STATUS_FAILED, STATUS_REJECTED] + +STATUS_LABELS = { + STATUS_DRAFT: "📝 草稿", + STATUS_APPROVED: "✅ 待发布", + STATUS_SCHEDULED: "🕐 已排期", + STATUS_PUBLISHING: "🚀 发布中", + STATUS_PUBLISHED: "✅ 已发布", + STATUS_FAILED: "❌ 失败", + STATUS_REJECTED: "🚫 已拒绝", +} + +MAX_RETRIES = 2 + + +class PublishQueue: + """发布队列管理器 (SQLite 持久化)""" + + def __init__(self, workspace_dir: str): + self.db_path = os.path.join(workspace_dir, "publish_queue.db") + os.makedirs(workspace_dir, exist_ok=True) + self._init_db() + # 发布中状态恢复 (启动时把 publishing → failed) + self._recover_stale() + + def _get_conn(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path, timeout=10) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + + def _init_db(self): + """初始化数据库表""" + conn = self._get_conn() + try: + conn.execute(""" + CREATE TABLE IF NOT EXISTS queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + content TEXT NOT NULL DEFAULT '', + sd_prompt TEXT DEFAULT '', + tags TEXT DEFAULT '[]', + image_paths TEXT DEFAULT '[]', + backup_dir TEXT DEFAULT '', + status TEXT NOT NULL DEFAULT 'draft', + scheduled_time TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + published_at TEXT, + topic TEXT DEFAULT '', + style TEXT DEFAULT '', + persona TEXT DEFAULT '', + error_message TEXT DEFAULT '', + retry_count INTEGER DEFAULT 0, + publish_result TEXT DEFAULT '' + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_queue_scheduled ON queue(scheduled_time) + """) + conn.commit() + finally: + conn.close() + + def _recover_stale(self): + """启动时将残留的 publishing 状态恢复为 failed""" + conn = self._get_conn() + try: + conn.execute( + "UPDATE queue SET status = ?, error_message = '程序重启,发布中断' " + "WHERE status = ?", + (STATUS_FAILED, STATUS_PUBLISHING), + ) + conn.commit() + finally: + conn.close() + + # ---------- CRUD ---------- + + def add(self, title: str, content: str, sd_prompt: str = "", + tags: list = None, image_paths: list = None, + backup_dir: str = "", topic: str = "", style: str = "", + persona: str = "", status: str = STATUS_DRAFT, + scheduled_time: str = None) -> int: + """添加一个队列项,返回 ID""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + conn = self._get_conn() + try: + cur = conn.execute( + """INSERT INTO queue (title, content, sd_prompt, tags, image_paths, + backup_dir, status, scheduled_time, created_at, updated_at, + topic, style, persona) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (title, content, sd_prompt, + json.dumps(tags or [], ensure_ascii=False), + json.dumps(image_paths or [], ensure_ascii=False), + backup_dir, status, scheduled_time, now, now, + topic, style, persona), + ) + conn.commit() + item_id = cur.lastrowid + logger.info("📋 队列添加 #%d: %s [%s]", item_id, title[:20], status) + return item_id + finally: + conn.close() + + def get(self, item_id: int) -> Optional[dict]: + """获取单个队列项""" + conn = self._get_conn() + try: + row = conn.execute("SELECT * FROM queue WHERE id = ?", (item_id,)).fetchone() + return self._row_to_dict(row) if row else None + finally: + conn.close() + + def update_status(self, item_id: int, status: str, + error_message: str = "", publish_result: str = ""): + """更新状态""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + conn = self._get_conn() + try: + fields = "status = ?, updated_at = ?" + params = [status, now] + if error_message: + fields += ", error_message = ?" + params.append(error_message) + if publish_result: + fields += ", publish_result = ?" + params.append(publish_result) + if status == STATUS_PUBLISHED: + fields += ", published_at = ?" + params.append(now) + params.append(item_id) + conn.execute(f"UPDATE queue SET {fields} WHERE id = ?", params) + conn.commit() + finally: + conn.close() + + def update_content(self, item_id: int, title: str = None, content: str = None, + sd_prompt: str = None, tags: list = None, + scheduled_time: str = None): + """更新内容 (仅 draft/approved/scheduled/failed 状态可编辑)""" + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + conn = self._get_conn() + try: + row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone() + if not row or row["status"] in (STATUS_PUBLISHING, STATUS_PUBLISHED): + return False + sets, params = ["updated_at = ?"], [now] + if title is not None: + sets.append("title = ?"); params.append(title) + if content is not None: + sets.append("content = ?"); params.append(content) + if sd_prompt is not None: + sets.append("sd_prompt = ?"); params.append(sd_prompt) + if tags is not None: + sets.append("tags = ?"); params.append(json.dumps(tags, ensure_ascii=False)) + if scheduled_time is not None: + sets.append("scheduled_time = ?"); params.append(scheduled_time) + params.append(item_id) + conn.execute(f"UPDATE queue SET {', '.join(sets)} WHERE id = ?", params) + conn.commit() + return True + finally: + conn.close() + + def delete(self, item_id: int) -> bool: + """删除队列项 (仅非 publishing 状态可删)""" + conn = self._get_conn() + try: + row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone() + if not row or row["status"] == STATUS_PUBLISHING: + return False + conn.execute("DELETE FROM queue WHERE id = ?", (item_id,)) + conn.commit() + return True + finally: + conn.close() + + def approve(self, item_id: int, scheduled_time: str = None) -> bool: + """审核通过 → 进入待发布或排期""" + conn = self._get_conn() + try: + row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone() + if not row or row["status"] not in (STATUS_DRAFT, STATUS_FAILED): + return False + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + new_status = STATUS_SCHEDULED if scheduled_time else STATUS_APPROVED + conn.execute( + "UPDATE queue SET status = ?, scheduled_time = ?, updated_at = ?, " + "error_message = '', retry_count = 0 WHERE id = ?", + (new_status, scheduled_time, now, item_id), + ) + conn.commit() + return True + finally: + conn.close() + + def reject(self, item_id: int) -> bool: + """拒绝/丢弃""" + return self._set_status_if(item_id, STATUS_REJECTED, + allowed_from=[STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED]) + + def retry(self, item_id: int) -> bool: + """失败项重试""" + conn = self._get_conn() + try: + row = conn.execute("SELECT status, retry_count FROM queue WHERE id = ?", (item_id,)).fetchone() + if not row or row["status"] != STATUS_FAILED: + return False + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + conn.execute( + "UPDATE queue SET status = ?, updated_at = ?, error_message = '' WHERE id = ?", + (STATUS_APPROVED, now, item_id), + ) + conn.commit() + return True + finally: + conn.close() + + def _set_status_if(self, item_id: int, new_status: str, allowed_from: list) -> bool: + conn = self._get_conn() + try: + row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone() + if not row or row["status"] not in allowed_from: + return False + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + conn.execute("UPDATE queue SET status = ?, updated_at = ? WHERE id = ?", + (new_status, now, item_id)) + conn.commit() + return True + finally: + conn.close() + + # ---------- 查询 ---------- + + def list_by_status(self, statuses: list = None, limit: int = 50) -> list[dict]: + """按状态查询队列项""" + conn = self._get_conn() + try: + if statuses: + placeholders = ",".join("?" * len(statuses)) + rows = conn.execute( + f"SELECT * FROM queue WHERE status IN ({placeholders}) " + "ORDER BY created_at DESC LIMIT ?", + statuses + [limit], + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM queue ORDER BY created_at DESC LIMIT ?", (limit,) + ).fetchall() + return [self._row_to_dict(r) for r in rows] + finally: + conn.close() + + def get_pending_publish(self) -> list[dict]: + """获取待发布项: approved 或 scheduled 且已到时间""" + conn = self._get_conn() + try: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + rows = conn.execute( + """SELECT * FROM queue + WHERE (status = ? OR (status = ? AND scheduled_time <= ?)) + ORDER BY + CASE WHEN scheduled_time IS NOT NULL THEN scheduled_time + ELSE created_at END ASC + LIMIT 10""", + (STATUS_APPROVED, STATUS_SCHEDULED, now), + ).fetchall() + return [self._row_to_dict(r) for r in rows] + finally: + conn.close() + + def count_by_status(self) -> dict: + """统计各状态数量""" + conn = self._get_conn() + try: + rows = conn.execute( + "SELECT status, COUNT(*) as cnt FROM queue GROUP BY status" + ).fetchall() + return {r["status"]: r["cnt"] for r in rows} + finally: + conn.close() + + def get_calendar_data(self, days: int = 30) -> list[dict]: + """获取日历数据 (最近 N 天的发布/排期概览)""" + conn = self._get_conn() + try: + cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S") + rows = conn.execute( + """SELECT id, title, status, scheduled_time, published_at, created_at + FROM queue + WHERE created_at >= ? OR scheduled_time >= ? OR published_at >= ? + ORDER BY COALESCE(scheduled_time, published_at, created_at) ASC""", + (cutoff, cutoff, cutoff), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + # ---------- 辅助 ---------- + + @staticmethod + def _row_to_dict(row: sqlite3.Row) -> dict: + """Row → dict, 并解析 JSON 字段""" + d = dict(row) + for key in ("tags", "image_paths"): + if key in d and isinstance(d[key], str): + try: + d[key] = json.loads(d[key]) + except json.JSONDecodeError: + d[key] = [] + return d + + def format_queue_table(self, statuses: list = None, limit: int = 30) -> str: + """生成 Markdown 格式的队列表格""" + items = self.list_by_status(statuses, limit) + if not items: + return "📭 队列为空" + + lines = ["| # | 状态 | 标题 | 主题 | 排期时间 | 创建时间 |", + "|---|------|------|------|----------|----------|"] + for item in items: + status_label = STATUS_LABELS.get(item["status"], item["status"]) + sched = item.get("scheduled_time") or "—" + if sched != "—": + sched = sched[:16] # 去掉秒 + created = item["created_at"][:16] if item.get("created_at") else "—" + title_short = (item.get("title") or "")[:18] + topic_short = (item.get("topic") or "")[:10] + lines.append(f"| {item['id']} | {status_label} | {title_short} | {topic_short} | {sched} | {created} |") + + # 统计摘要 + counts = self.count_by_status() + summary_parts = [] + for s, label in STATUS_LABELS.items(): + cnt = counts.get(s, 0) + if cnt > 0: + summary_parts.append(f"{label}: {cnt}") + summary = " · ".join(summary_parts) if summary_parts else "全部为空" + + return f"**队列统计**: {summary}\n\n" + "\n".join(lines) + + def format_calendar(self, days: int = 14) -> str: + """生成简易日历视图 (Markdown)""" + data = self.get_calendar_data(days) + if not data: + return "📅 暂无排期数据" + + # 按日期分组 + by_date = {} + for item in data: + # 优先用排期时间,其次发布时间,最后创建时间 + dt_str = item.get("scheduled_time") or item.get("published_at") or item["created_at"] + date_key = dt_str[:10] if dt_str else "未知" + by_date.setdefault(date_key, []).append(item) + + lines = ["### 📅 内容日历 (近 %d 天)\n" % days] + today = datetime.now().strftime("%Y-%m-%d") + + for date_key in sorted(by_date.keys()): + marker = " 📌 **今天**" if date_key == today else "" + lines.append(f"**{date_key}**{marker}") + for item in by_date[date_key]: + status_icon = STATUS_LABELS.get(item["status"], "❓") + time_part = "" + if item.get("scheduled_time"): + time_part = f" ⏰{item['scheduled_time'][11:16]}" + elif item.get("published_at"): + time_part = f" ✅{item['published_at'][11:16]}" + title_short = (item.get("title") or "无标题")[:20] + lines.append(f" - {status_icon} #{item['id']} {title_short}{time_part}") + lines.append("") + + return "\n".join(lines) + + def format_preview(self, item_id: int) -> str: + """生成单个项目的详细预览 (Markdown)""" + item = self.get(item_id) + if not item: + return "❌ 未找到该队列项" + + status_label = STATUS_LABELS.get(item["status"], item["status"]) + tags = item.get("tags", []) + tags_str = " ".join(f"#{t}" for t in tags) if tags else "无标签" + images = item.get("image_paths", []) + img_count = len(images) if images else 0 + + lines = [ + f"## {status_label} #{item['id']}", + f"### 📌 {item.get('title', '无标题')}", + "", + item.get("content", "无正文"), + "", + f"---", + f"**主题**: {item.get('topic', '—')} · **风格**: {item.get('style', '—')}", + f"**标签**: {tags_str}", + f"**图片**: {img_count} 张", + f"**人设**: {(item.get('persona') or '—')[:30]}", + ] + + if item.get("scheduled_time"): + lines.append(f"**排期**: {item['scheduled_time']}") + if item.get("error_message"): + lines.append(f"**错误**: ❌ {item['error_message']}") + if item.get("backup_dir"): + lines.append(f"**备份**: `{item['backup_dir']}`") + + lines.extend([ + f"**创建**: {item.get('created_at', '—')}", + f"**更新**: {item.get('updated_at', '—')}", + ]) + if item.get("published_at"): + lines.append(f"**发布**: {item['published_at']}") + + return "\n".join(lines) + + +class QueuePublisher: + """后台队列发布处理器""" + + def __init__(self, queue: PublishQueue): + self.queue = queue + self._running = threading.Event() + self._thread = None + self._publish_fn = None # 由外部注册的发布回调 + self._log_fn = None # 日志回调 + + def set_publish_callback(self, fn): + """注册发布回调: fn(item: dict) -> (success: bool, message: str)""" + self._publish_fn = fn + + def set_log_callback(self, fn): + """注册日志回调: fn(msg: str)""" + self._log_fn = fn + + def _log(self, msg: str): + logger.info(msg) + if self._log_fn: + try: + self._log_fn(msg) + except Exception: + pass + + def start(self, check_interval: int = 60): + """启动后台队列处理""" + if self._running.is_set(): + return + self._running.set() + self._thread = threading.Thread( + target=self._loop, args=(check_interval,), daemon=True + ) + self._thread.start() + self._log("📋 发布队列处理器已启动") + + def stop(self): + """停止队列处理""" + self._running.clear() + self._log("📋 发布队列处理器已停止") + + @property + def is_running(self) -> bool: + return self._running.is_set() + + def _loop(self, interval: int): + while self._running.is_set(): + try: + self._process_pending() + except Exception as e: + self._log(f"❌ 队列处理异常: {e}") + logger.error("队列处理异常: %s", e, exc_info=True) + # 等待,但可中断 + for _ in range(interval): + if not self._running.is_set(): + break + time.sleep(1) + + def _process_pending(self): + """处理所有待发布项""" + if not self._publish_fn: + return + + pending = self.queue.get_pending_publish() + if not pending: + return + + for item in pending: + if not self._running.is_set(): + break + + item_id = item["id"] + title = item.get("title", "")[:20] + self._log(f"📋 队列发布 #{item_id}: {title}") + + # 标记为发布中 + self.queue.update_status(item_id, STATUS_PUBLISHING) + + try: + success, message = self._publish_fn(item) + if success: + self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message) + self._log(f"✅ 队列发布成功 #{item_id}: {title}") + else: + retry_count = item.get("retry_count", 0) + 1 + if retry_count <= MAX_RETRIES: + # 还有重试机会 → approved 状态等下一轮 + self.queue.update_status(item_id, STATUS_APPROVED, error_message=f"第{retry_count}次失败: {message}") + conn = self.queue._get_conn() + try: + conn.execute("UPDATE queue SET retry_count = ? WHERE id = ?", + (retry_count, item_id)) + conn.commit() + finally: + conn.close() + self._log(f"⚠️ #{item_id} 发布失败 (重试 {retry_count}/{MAX_RETRIES}): {message}") + else: + self.queue.update_status(item_id, STATUS_FAILED, error_message=message) + self._log(f"❌ #{item_id} 发布失败已达重试上限: {message}") + + except Exception as e: + self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e)) + self._log(f"❌ #{item_id} 发布异常: {e}") + + # 发布间隔 (模拟真人) + import random + wait = random.randint(5, 15) + self._log(f"⏳ 等待 {wait}s 后处理下一项...") + for _ in range(wait): + if not self._running.is_set(): + break + time.sleep(1) + + def publish_now(self, item_id: int) -> str: + """立即发布指定项 (不经过后台循环)""" + if not self._publish_fn: + return "❌ 发布回调未注册" + + item = self.queue.get(item_id) + if not item: + return "❌ 未找到队列项" + if item["status"] not in (STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED): + return f"❌ 当前状态 [{STATUS_LABELS.get(item['status'], item['status'])}] 不可发布" + + self.queue.update_status(item_id, STATUS_PUBLISHING) + try: + success, message = self._publish_fn(item) + if success: + self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message) + return f"✅ 发布成功: {message}" + else: + self.queue.update_status(item_id, STATUS_FAILED, error_message=message) + return f"❌ 发布失败: {message}" + except Exception as e: + self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e)) + return f"❌ 发布异常: {e}"