xhs_factory/services/publish_queue.py
zhoujie 4d83c0f4a9
Some checks failed
CI / Lint (ruff) (push) Has been cancelled
CI / Import Check (push) Has been cancelled
feat(scheduler): 新增热点自动采集功能并优化发布路径
- 新增热点自动采集后台线程,支持定时搜索关键词并执行 AI 分析,结果缓存至结构化状态
- 新增热点分析状态管理接口,提供线程安全的 `get_last_analysis` 和 `set_last_analysis` 方法
- 新增热点数据桥接函数 `feed_hotspot_to_engine`,将分析结果注入 TopicEngine 实现热点加权推荐
- 新增热点选题下拉组件,分析完成后自动填充推荐选题,选中后自动写入选题输入框
- 优化 `generate_from_hotspot` 函数,自动获取结构化分析摘要并增强生成上下文
- 新增热点自动采集配置节点,支持通过 `config.json` 管理关键词和采集间隔

♻️ refactor(queue): 实现智能排期引擎并统一发布路径

- 新增智能排期引擎,基于 `AnalyticsService` 的 `time_weights` 自动计算最优发布时段
- 新增 `PublishQueue.suggest_schedule_time` 和 `auto_schedule_item` 方法,支持时段冲突检测和内容分布控制
- 修改 `generate_to_queue` 函数,新增 `auto_schedule` 和 `auto_approve` 参数,支持自动排期和自动审核
- 重构 `_scheduler_loop` 的自动发布分支,改为调用 `generate_to_queue` 通过队列发布,统一发布路径
- 重构 `auto_publish_once` 函数,移除直接发布逻辑,改为生成内容入队并返回队列信息
- 新增队列时段使用情况查询方法 `get_slot_usage`,支持 UI 热力图展示

📝 docs(openspec): 新增内容排期优化和热点探测优化规范文档

- 新增 `smart-schedule-engine` 规范,定义智能排期引擎的功能需求和场景
- 新增 `unified-publish-path` 规范,定义统一发布路径的改造方案
- 新增 `hotspot-analysis-state` 规范,定义热点分析状态存储的线程安全接口
- 新增 `hotspot-auto-collector` 规范,定义定时热点自动采集的任务流程
- 新增 `hotspot-engine-bridge` 规范,定义热点数据注入 TopicEngine 的桥接机制
- 新增 `hotspot-topic-selector` 规范,定义热点选题下拉组件的交互行为
- 更新 `services-queue`、`services-scheduler` 和 `services-hotspot` 规范,反映功能修改和新增参数

🔧 chore(config): 新增热点自动采集默认配置

- 在 `DEFAULT_CONFIG` 中新增 `hotspot_auto_collect` 配置节点,包含 `enabled`、`keywords` 和 `interval_hours` 字段
- 提供默认关键词列表 `["穿搭", "美妆", "好物"]` 和默认采集间隔 4 小时

🐛 fix(llm): 增强 JSON 解析容错能力

- 新增 `_try_fix_truncated_json` 方法,尝试修复被 token 限制截断的 JSON 输出
- 支持多种截断场景的自动补全,包括字符串值、数组和嵌套对象的截断修复
- 提高 LLM 分析热点等返回 JSON 的函数的稳定性

💄 style(ui): 优化队列管理和热点探测界面

- 在队列生成区域新增自动排期复选框,勾选后隐藏手动排期输入框
- 在日历视图旁新增推荐时段 Markdown 面板,展示各时段权重和建议热力图
- 在热点探测 Tab 新增推荐选题下拉组件,分析完成后动态填充选项
- 在热点探测 Tab 新增热点自动采集控制区域,支持启动、停止和配置采集参数
2026-02-28 22:22:27 +08:00

775 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
发布队列模块
SQLite 持久化的内容排期 + 发布队列,支持草稿预审、定时发布、失败重试
"""
import sqlite3
import json
import os
import time
import logging
import threading
import base64
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
# 队列项状态
STATUS_DRAFT = "draft" # 草稿 — 待审核
STATUS_APPROVED = "approved" # 已审核 — 待排期或立即可发布
STATUS_SCHEDULED = "scheduled" # 已排期 — 定时发布
STATUS_PUBLISHING = "publishing" # 发布中
STATUS_PUBLISHED = "published" # 已发布
STATUS_FAILED = "failed" # 发布失败
STATUS_REJECTED = "rejected" # 已拒绝/丢弃
ALL_STATUSES = [STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED,
STATUS_PUBLISHING, STATUS_PUBLISHED, STATUS_FAILED, STATUS_REJECTED]
STATUS_LABELS = {
STATUS_DRAFT: "📝 草稿",
STATUS_APPROVED: "✅ 待发布",
STATUS_SCHEDULED: "🕐 已排期",
STATUS_PUBLISHING: "🚀 发布中",
STATUS_PUBLISHED: "✅ 已发布",
STATUS_FAILED: "❌ 失败",
STATUS_REJECTED: "🚫 已拒绝",
}
MAX_RETRIES = 2
class PublishQueue:
"""发布队列管理器 (SQLite 持久化)"""
def __init__(self, workspace_dir: str):
self.db_path = os.path.join(workspace_dir, "publish_queue.db")
os.makedirs(workspace_dir, exist_ok=True)
self._init_db()
# 发布中状态恢复 (启动时把 publishing → failed)
self._recover_stale()
def _get_conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def _init_db(self):
"""初始化数据库表"""
conn = self._get_conn()
try:
conn.execute("""
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
sd_prompt TEXT DEFAULT '',
tags TEXT DEFAULT '[]',
image_paths TEXT DEFAULT '[]',
backup_dir TEXT DEFAULT '',
status TEXT NOT NULL DEFAULT 'draft',
scheduled_time TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
published_at TEXT,
topic TEXT DEFAULT '',
style TEXT DEFAULT '',
persona TEXT DEFAULT '',
error_message TEXT DEFAULT '',
retry_count INTEGER DEFAULT 0,
publish_result TEXT DEFAULT ''
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_queue_scheduled ON queue(scheduled_time)
""")
conn.commit()
finally:
conn.close()
def _recover_stale(self):
"""启动时将残留的 publishing 状态恢复为 failed"""
conn = self._get_conn()
try:
conn.execute(
"UPDATE queue SET status = ?, error_message = '程序重启,发布中断' "
"WHERE status = ?",
(STATUS_FAILED, STATUS_PUBLISHING),
)
conn.commit()
finally:
conn.close()
# ---------- CRUD ----------
def add(self, title: str, content: str, sd_prompt: str = "",
tags: list = None, image_paths: list = None,
backup_dir: str = "", topic: str = "", style: str = "",
persona: str = "", status: str = STATUS_DRAFT,
scheduled_time: str = None) -> int:
"""添加一个队列项,返回 ID"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = self._get_conn()
try:
cur = conn.execute(
"""INSERT INTO queue (title, content, sd_prompt, tags, image_paths,
backup_dir, status, scheduled_time, created_at, updated_at,
topic, style, persona)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(title, content, sd_prompt,
json.dumps(tags or [], ensure_ascii=False),
json.dumps(image_paths or [], ensure_ascii=False),
backup_dir, status, scheduled_time, now, now,
topic, style, persona),
)
conn.commit()
item_id = cur.lastrowid
logger.info("📋 队列添加 #%d: %s [%s]", item_id, title[:20], status)
return item_id
finally:
conn.close()
def get(self, item_id: int) -> Optional[dict]:
"""获取单个队列项"""
conn = self._get_conn()
try:
row = conn.execute("SELECT * FROM queue WHERE id = ?", (item_id,)).fetchone()
return self._row_to_dict(row) if row else None
finally:
conn.close()
def update_status(self, item_id: int, status: str,
error_message: str = "", publish_result: str = ""):
"""更新状态"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = self._get_conn()
try:
fields = "status = ?, updated_at = ?"
params = [status, now]
if error_message:
fields += ", error_message = ?"
params.append(error_message)
if publish_result:
fields += ", publish_result = ?"
params.append(publish_result)
if status == STATUS_PUBLISHED:
fields += ", published_at = ?"
params.append(now)
params.append(item_id)
conn.execute(f"UPDATE queue SET {fields} WHERE id = ?", params)
conn.commit()
finally:
conn.close()
def update_content(self, item_id: int, title: str = None, content: str = None,
sd_prompt: str = None, tags: list = None,
scheduled_time: str = None):
"""更新内容 (仅 draft/approved/scheduled/failed 状态可编辑)"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = self._get_conn()
try:
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
if not row or row["status"] in (STATUS_PUBLISHING, STATUS_PUBLISHED):
return False
sets, params = ["updated_at = ?"], [now]
if title is not None:
sets.append("title = ?"); params.append(title)
if content is not None:
sets.append("content = ?"); params.append(content)
if sd_prompt is not None:
sets.append("sd_prompt = ?"); params.append(sd_prompt)
if tags is not None:
sets.append("tags = ?"); params.append(json.dumps(tags, ensure_ascii=False))
if scheduled_time is not None:
sets.append("scheduled_time = ?"); params.append(scheduled_time)
params.append(item_id)
conn.execute(f"UPDATE queue SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
return True
finally:
conn.close()
def delete(self, item_id: int) -> bool:
"""删除队列项 (仅非 publishing 状态可删)"""
conn = self._get_conn()
try:
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
if not row or row["status"] == STATUS_PUBLISHING:
return False
conn.execute("DELETE FROM queue WHERE id = ?", (item_id,))
conn.commit()
return True
finally:
conn.close()
def approve(self, item_id: int, scheduled_time: str = None) -> bool:
"""审核通过 → 进入待发布或排期"""
conn = self._get_conn()
try:
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
if not row or row["status"] not in (STATUS_DRAFT, STATUS_FAILED):
return False
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_status = STATUS_SCHEDULED if scheduled_time else STATUS_APPROVED
conn.execute(
"UPDATE queue SET status = ?, scheduled_time = ?, updated_at = ?, "
"error_message = '', retry_count = 0 WHERE id = ?",
(new_status, scheduled_time, now, item_id),
)
conn.commit()
return True
finally:
conn.close()
def reject(self, item_id: int) -> bool:
"""拒绝/丢弃"""
return self._set_status_if(item_id, STATUS_REJECTED,
allowed_from=[STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED])
def retry(self, item_id: int) -> bool:
"""失败项重试"""
conn = self._get_conn()
try:
row = conn.execute("SELECT status, retry_count FROM queue WHERE id = ?", (item_id,)).fetchone()
if not row or row["status"] != STATUS_FAILED:
return False
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"UPDATE queue SET status = ?, updated_at = ?, error_message = '' WHERE id = ?",
(STATUS_APPROVED, now, item_id),
)
conn.commit()
return True
finally:
conn.close()
def _set_status_if(self, item_id: int, new_status: str, allowed_from: list) -> bool:
conn = self._get_conn()
try:
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
if not row or row["status"] not in allowed_from:
return False
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute("UPDATE queue SET status = ?, updated_at = ? WHERE id = ?",
(new_status, now, item_id))
conn.commit()
return True
finally:
conn.close()
# ---------- 查询 ----------
def list_by_status(self, statuses: list = None, limit: int = 50) -> list[dict]:
"""按状态查询队列项"""
conn = self._get_conn()
try:
if statuses:
placeholders = ",".join("?" * len(statuses))
rows = conn.execute(
f"SELECT * FROM queue WHERE status IN ({placeholders}) "
"ORDER BY created_at DESC LIMIT ?",
statuses + [limit],
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM queue ORDER BY created_at DESC LIMIT ?", (limit,)
).fetchall()
return [self._row_to_dict(r) for r in rows]
finally:
conn.close()
def get_pending_publish(self) -> list[dict]:
"""获取待发布项: approved 或 scheduled 且已到时间"""
conn = self._get_conn()
try:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
rows = conn.execute(
"""SELECT * FROM queue
WHERE (status = ? OR (status = ? AND scheduled_time <= ?))
ORDER BY
CASE WHEN scheduled_time IS NOT NULL THEN scheduled_time
ELSE created_at END ASC
LIMIT 10""",
(STATUS_APPROVED, STATUS_SCHEDULED, now),
).fetchall()
return [self._row_to_dict(r) for r in rows]
finally:
conn.close()
def count_by_status(self) -> dict:
"""统计各状态数量"""
conn = self._get_conn()
try:
rows = conn.execute(
"SELECT status, COUNT(*) as cnt FROM queue GROUP BY status"
).fetchall()
return {r["status"]: r["cnt"] for r in rows}
finally:
conn.close()
def get_calendar_data(self, days: int = 30) -> list[dict]:
"""获取日历数据 (最近 N 天的发布/排期概览)"""
conn = self._get_conn()
try:
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
rows = conn.execute(
"""SELECT id, title, status, scheduled_time, published_at, created_at
FROM queue
WHERE created_at >= ? OR scheduled_time >= ? OR published_at >= ?
ORDER BY COALESCE(scheduled_time, published_at, created_at) ASC""",
(cutoff, cutoff, cutoff),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
# ---------- 辅助 ----------
@staticmethod
def _row_to_dict(row: sqlite3.Row) -> dict:
"""Row → dict, 并解析 JSON 字段"""
d = dict(row)
for key in ("tags", "image_paths"):
if key in d and isinstance(d[key], str):
try:
d[key] = json.loads(d[key])
except json.JSONDecodeError:
d[key] = []
return d
def format_queue_table(self, statuses: list = None, limit: int = 30) -> str:
"""生成 Markdown 格式的队列表格"""
items = self.list_by_status(statuses, limit)
if not items:
return "📭 队列为空"
lines = ["| # | 状态 | 标题 | 主题 | 排期时间 | 创建时间 |",
"|---|------|------|------|----------|----------|"]
for item in items:
status_label = STATUS_LABELS.get(item["status"], item["status"])
sched = item.get("scheduled_time") or ""
if sched != "":
sched = sched[:16] # 去掉秒
created = item["created_at"][:16] if item.get("created_at") else ""
title_short = (item.get("title") or "")[:18]
topic_short = (item.get("topic") or "")[:10]
lines.append(f"| {item['id']} | {status_label} | {title_short} | {topic_short} | {sched} | {created} |")
# 统计摘要
counts = self.count_by_status()
summary_parts = []
for s, label in STATUS_LABELS.items():
cnt = counts.get(s, 0)
if cnt > 0:
summary_parts.append(f"{label}: {cnt}")
summary = " · ".join(summary_parts) if summary_parts else "全部为空"
return f"**队列统计**: {summary}\n\n" + "\n".join(lines)
def format_calendar(self, days: int = 14) -> str:
"""生成简易日历视图 (Markdown)"""
data = self.get_calendar_data(days)
if not data:
return "📅 暂无排期数据"
# 按日期分组
by_date = {}
for item in data:
# 优先用排期时间,其次发布时间,最后创建时间
dt_str = item.get("scheduled_time") or item.get("published_at") or item["created_at"]
date_key = dt_str[:10] if dt_str else "未知"
by_date.setdefault(date_key, []).append(item)
lines = ["### 📅 内容日历 (近 %d 天)\n" % days]
today = datetime.now().strftime("%Y-%m-%d")
for date_key in sorted(by_date.keys()):
marker = " 📌 **今天**" if date_key == today else ""
lines.append(f"**{date_key}**{marker}")
for item in by_date[date_key]:
status_icon = STATUS_LABELS.get(item["status"], "")
time_part = ""
if item.get("scheduled_time"):
time_part = f"{item['scheduled_time'][11:16]}"
elif item.get("published_at"):
time_part = f"{item['published_at'][11:16]}"
title_short = (item.get("title") or "无标题")[:20]
lines.append(f" - {status_icon} #{item['id']} {title_short}{time_part}")
lines.append("")
return "\n".join(lines)
def format_preview(self, item_id: int) -> str:
"""生成单个项目的详细预览 (Markdown)"""
item = self.get(item_id)
if not item:
return "❌ 未找到该队列项"
status_label = STATUS_LABELS.get(item["status"], item["status"])
tags = item.get("tags", [])
tags_str = " ".join(f"#{t}" for t in tags) if tags else "无标签"
images = item.get("image_paths", [])
img_count = len(images) if images else 0
lines = [
f"## {status_label} #{item['id']}",
f"### 📌 {item.get('title', '无标题')}",
"",
item.get("content", "无正文"),
"",
f"---",
f"**主题**: {item.get('topic', '')} · **风格**: {item.get('style', '')}",
f"**标签**: {tags_str}",
f"**图片**: {img_count}",
f"**人设**: {(item.get('persona') or '')[:30]}",
]
if item.get("scheduled_time"):
lines.append(f"**排期**: {item['scheduled_time']}")
if item.get("error_message"):
lines.append(f"**错误**: ❌ {item['error_message']}")
if item.get("backup_dir"):
lines.append(f"**备份**: `{item['backup_dir']}`")
if item.get("sd_prompt"):
lines.append(f"**SD提示词**: {item['sd_prompt'][:100]}{'...' if len(item.get('sd_prompt', '')) > 100 else ''}")
lines.extend([
f"**创建**: {item.get('created_at', '')}",
f"**更新**: {item.get('updated_at', '')}",
])
if item.get("published_at"):
lines.append(f"**发布**: {item['published_at']}")
# 展示图片(如有)
if images:
lines.append("\n### 📷 生成的图片\n")
for i, img in enumerate(images):
# 检查图片是否存在
exists = os.path.exists(img)
status_icon = "" if exists else ""
# 转换为绝对路径
abs_path = os.path.abspath(img) if exists else img
lines.append(f"{i+1}. {status_icon} `{abs_path}`")
# 如果图片存在,将图片转换为 base64 编码嵌入
if exists:
try:
# 读取图片并转换为 base64
with open(abs_path, 'rb') as f:
img_data = f.read()
img_base64 = base64.b64encode(img_data).decode('utf-8')
# 根据文件扩展名确定 MIME 类型
ext = os.path.splitext(abs_path)[1].lower()
mime_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.bmp': 'image/bmp'
}
mime_type = mime_types.get(ext, 'image/jpeg')
# 使用 data URI 格式嵌入图片
data_uri = f"data:{mime_type};base64,{img_base64}"
lines.append(f" ![预览图{i+1}]({data_uri})")
lines.append(f" <small>图片大小: {len(img_data) / 1024:.1f} KB</small>")
except Exception as e:
lines.append(f" ⚠️ 无法读取图片: {e}")
else:
lines.append(f" ⚠️ 图片文件不存在")
lines.append("")
return "\n".join(lines)
# ---------- 智能排期引擎 ----------
def get_slot_usage(self, days: int = 7) -> dict:
"""查询未来 N 天各日期各时段已排期的数量。
返回: {"2026-02-28": {"18-21时": 1, "08-11时": 2}, ...}
"""
conn = self._get_conn()
try:
now = datetime.now()
cutoff = (now + timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
rows = conn.execute(
"SELECT scheduled_time FROM queue "
"WHERE status IN (?, ?) AND scheduled_time IS NOT NULL AND scheduled_time >= ? AND scheduled_time <= ?",
(STATUS_SCHEDULED, STATUS_APPROVED, now.strftime("%Y-%m-%d %H:%M:%S"), cutoff),
).fetchall()
usage: dict[str, dict[str, int]] = {}
for row in rows:
st = row["scheduled_time"]
if not st:
continue
try:
dt = datetime.strptime(st[:19], "%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
continue
date_key = dt.strftime("%Y-%m-%d")
hour = dt.hour
# 映射到3小时段
slot = self._hour_to_slot(hour)
usage.setdefault(date_key, {})
usage[date_key][slot] = usage[date_key].get(slot, 0) + 1
return usage
finally:
conn.close()
@staticmethod
def _hour_to_slot(hour: int) -> str:
"""将小时映射到时段标签。"""
brackets = [
(0, 3, "00-03时"), (3, 6, "03-06时"), (6, 8, "06-08时"),
(8, 11, "08-11时"), (11, 12, "11-12时"), (12, 14, "12-14时"),
(14, 18, "14-18时"), (18, 21, "18-21时"), (21, 24, "21-24时"),
]
for lo, hi, label in brackets:
if lo <= hour < hi:
return label
return "21-24时"
@staticmethod
def _slot_to_hour_range(slot: str) -> tuple[int, int]:
"""从时段标签提取起止小时 (start, end)。"""
import re as _re
m = _re.match(r"(\d{2})-(\d{2})时", slot)
if m:
return int(m.group(1)), int(m.group(2))
return 18, 21 # fallback
def suggest_schedule_time(self, analytics, max_per_slot: int = 2,
max_per_day: int = 5) -> str | None:
"""基于时段权重和已有排期,计算最优发布时间。
返回格式: '%Y-%m-%d %H:%M:%S',所有时段满时返回 None。
"""
import random as _random
time_weights = analytics.get_time_weights()
if not time_weights:
return None
# 按权重降序排列候选时段
sorted_slots = sorted(time_weights.items(),
key=lambda x: x[1] if isinstance(x[1], (int, float)) else x[1].get("weight", 0),
reverse=True)
usage = self.get_slot_usage(days=7)
now = datetime.now()
for day_offset in range(8): # 今天 + 未来7天
target_date = now + timedelta(days=day_offset)
date_key = target_date.strftime("%Y-%m-%d")
# 检查当天总量
day_usage = usage.get(date_key, {})
day_total = sum(day_usage.values())
if day_total >= max_per_day:
continue
for slot_name, slot_info in sorted_slots:
slot_count = day_usage.get(slot_name, 0)
if slot_count >= max_per_slot:
continue
start_hour, end_hour = self._slot_to_hour_range(slot_name)
# 如果是今天,跳过已过去的时段
if day_offset == 0 and end_hour <= now.hour:
continue
# 如果是今天且时段正在进行中,起始小时调整为当前 +1
effective_start = start_hour
if day_offset == 0 and start_hour <= now.hour < end_hour:
effective_start = now.hour + 1
if effective_start >= end_hour:
continue
# 在时段内随机选一个时间
rand_hour = _random.randint(effective_start, end_hour - 1)
rand_minute = _random.randint(0, 59)
scheduled = target_date.replace(hour=rand_hour, minute=rand_minute,
second=0, microsecond=0)
return scheduled.strftime("%Y-%m-%d %H:%M:%S")
return None
def auto_schedule_item(self, item_id: int, analytics,
max_per_slot: int = 2, max_per_day: int = 5) -> bool:
"""为指定队列项自动分配排期时间。
成功返回 True状态变为 scheduled无可用时段返回 False。
"""
item = self.get(item_id)
if not item or item["status"] not in (STATUS_DRAFT, STATUS_APPROVED):
return False
scheduled_time = self.suggest_schedule_time(
analytics, max_per_slot=max_per_slot, max_per_day=max_per_day,
)
if not scheduled_time:
logger.warning("auto_schedule_item #%d: 未来7天无可用时段", item_id)
return False
# 更新排期时间 + 状态
conn = self._get_conn()
try:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"UPDATE queue SET status = ?, scheduled_time = ?, updated_at = ? WHERE id = ?",
(STATUS_SCHEDULED, scheduled_time, now_str, item_id),
)
conn.commit()
logger.info("auto_schedule_item #%d%s", item_id, scheduled_time)
return True
finally:
conn.close()
class QueuePublisher:
"""后台队列发布处理器"""
def __init__(self, queue: PublishQueue):
self.queue = queue
self._running = threading.Event()
self._thread = None
self._publish_fn = None # 由外部注册的发布回调
self._log_fn = None # 日志回调
def set_publish_callback(self, fn):
"""注册发布回调: fn(item: dict) -> (success: bool, message: str)"""
self._publish_fn = fn
def set_log_callback(self, fn):
"""注册日志回调: fn(msg: str)"""
self._log_fn = fn
def _log(self, msg: str):
logger.info(msg)
if self._log_fn:
try:
self._log_fn(msg)
except Exception:
pass
def start(self, check_interval: int = 60):
"""启动后台队列处理"""
if self._running.is_set():
return
self._running.set()
self._thread = threading.Thread(
target=self._loop, args=(check_interval,), daemon=True
)
self._thread.start()
self._log("📋 发布队列处理器已启动")
def stop(self):
"""停止队列处理"""
self._running.clear()
self._log("📋 发布队列处理器已停止")
@property
def is_running(self) -> bool:
return self._running.is_set()
def _loop(self, interval: int):
while self._running.is_set():
try:
self._process_pending()
except Exception as e:
self._log(f"❌ 队列处理异常: {e}")
logger.error("队列处理异常: %s", e, exc_info=True)
# 等待,但可中断
for _ in range(interval):
if not self._running.is_set():
break
time.sleep(1)
def _process_pending(self):
"""处理所有待发布项"""
if not self._publish_fn:
return
pending = self.queue.get_pending_publish()
if not pending:
return
for item in pending:
if not self._running.is_set():
break
item_id = item["id"]
title = item.get("title", "")[:20]
self._log(f"📋 队列发布 #{item_id}: {title}")
# 标记为发布中
self.queue.update_status(item_id, STATUS_PUBLISHING)
try:
success, message = self._publish_fn(item)
if success:
self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message)
self._log(f"✅ 队列发布成功 #{item_id}: {title}")
else:
retry_count = item.get("retry_count", 0) + 1
if retry_count <= MAX_RETRIES:
# 还有重试机会 → approved 状态等下一轮
self.queue.update_status(item_id, STATUS_APPROVED, error_message=f"{retry_count}次失败: {message}")
conn = self.queue._get_conn()
try:
conn.execute("UPDATE queue SET retry_count = ? WHERE id = ?",
(retry_count, item_id))
conn.commit()
finally:
conn.close()
self._log(f"⚠️ #{item_id} 发布失败 (重试 {retry_count}/{MAX_RETRIES}): {message}")
else:
self.queue.update_status(item_id, STATUS_FAILED, error_message=message)
self._log(f"❌ #{item_id} 发布失败已达重试上限: {message}")
except Exception as e:
self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e))
self._log(f"❌ #{item_id} 发布异常: {e}")
# 发布间隔 (模拟真人)
import random
wait = random.randint(5, 15)
self._log(f"⏳ 等待 {wait}s 后处理下一项...")
for _ in range(wait):
if not self._running.is_set():
break
time.sleep(1)
def publish_now(self, item_id: int) -> str:
"""立即发布指定项 (不经过后台循环)"""
if not self._publish_fn:
return "❌ 发布回调未注册"
item = self.queue.get(item_id)
if not item:
return "❌ 未找到队列项"
if item["status"] not in (STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED):
return f"❌ 当前状态 [{STATUS_LABELS.get(item['status'], item['status'])}] 不可发布"
self.queue.update_status(item_id, STATUS_PUBLISHING)
try:
success, message = self._publish_fn(item)
if success:
self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message)
return f"✅ 发布成功: {message}"
else:
self.queue.update_status(item_id, STATUS_FAILED, error_message=message)
return f"❌ 发布失败: {message}"
except Exception as e:
self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e))
return f"❌ 发布异常: {e}"