- 新增热点自动采集后台线程,支持定时搜索关键词并执行 AI 分析,结果缓存至结构化状态 - 新增热点分析状态管理接口,提供线程安全的 `get_last_analysis` 和 `set_last_analysis` 方法 - 新增热点数据桥接函数 `feed_hotspot_to_engine`,将分析结果注入 TopicEngine 实现热点加权推荐 - 新增热点选题下拉组件,分析完成后自动填充推荐选题,选中后自动写入选题输入框 - 优化 `generate_from_hotspot` 函数,自动获取结构化分析摘要并增强生成上下文 - 新增热点自动采集配置节点,支持通过 `config.json` 管理关键词和采集间隔 ♻️ refactor(queue): 实现智能排期引擎并统一发布路径 - 新增智能排期引擎,基于 `AnalyticsService` 的 `time_weights` 自动计算最优发布时段 - 新增 `PublishQueue.suggest_schedule_time` 和 `auto_schedule_item` 方法,支持时段冲突检测和内容分布控制 - 修改 `generate_to_queue` 函数,新增 `auto_schedule` 和 `auto_approve` 参数,支持自动排期和自动审核 - 重构 `_scheduler_loop` 的自动发布分支,改为调用 `generate_to_queue` 通过队列发布,统一发布路径 - 重构 `auto_publish_once` 函数,移除直接发布逻辑,改为生成内容入队并返回队列信息 - 新增队列时段使用情况查询方法 `get_slot_usage`,支持 UI 热力图展示 📝 docs(openspec): 新增内容排期优化和热点探测优化规范文档 - 新增 `smart-schedule-engine` 规范,定义智能排期引擎的功能需求和场景 - 新增 `unified-publish-path` 规范,定义统一发布路径的改造方案 - 新增 `hotspot-analysis-state` 规范,定义热点分析状态存储的线程安全接口 - 新增 `hotspot-auto-collector` 规范,定义定时热点自动采集的任务流程 - 新增 `hotspot-engine-bridge` 规范,定义热点数据注入 TopicEngine 的桥接机制 - 新增 `hotspot-topic-selector` 规范,定义热点选题下拉组件的交互行为 - 更新 `services-queue`、`services-scheduler` 和 `services-hotspot` 规范,反映功能修改和新增参数 🔧 chore(config): 新增热点自动采集默认配置 - 在 `DEFAULT_CONFIG` 中新增 `hotspot_auto_collect` 配置节点,包含 `enabled`、`keywords` 和 `interval_hours` 字段 - 提供默认关键词列表 `["穿搭", "美妆", "好物"]` 和默认采集间隔 4 小时 🐛 fix(llm): 增强 JSON 解析容错能力 - 新增 `_try_fix_truncated_json` 方法,尝试修复被 token 限制截断的 JSON 输出 - 支持多种截断场景的自动补全,包括字符串值、数组和嵌套对象的截断修复 - 提高 LLM 分析热点等返回 JSON 的函数的稳定性 💄 style(ui): 优化队列管理和热点探测界面 - 在队列生成区域新增自动排期复选框,勾选后隐藏手动排期输入框 - 在日历视图旁新增推荐时段 Markdown 面板,展示各时段权重和建议热力图 - 在热点探测 Tab 新增推荐选题下拉组件,分析完成后动态填充选项 - 在热点探测 Tab 新增热点自动采集控制区域,支持启动、停止和配置采集参数
775 lines
30 KiB
Python
775 lines
30 KiB
Python
"""
|
||
发布队列模块
|
||
SQLite 持久化的内容排期 + 发布队列,支持草稿预审、定时发布、失败重试
|
||
"""
|
||
import sqlite3
|
||
import json
|
||
import os
|
||
import time
|
||
import logging
|
||
import threading
|
||
import base64
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 队列项状态
|
||
STATUS_DRAFT = "draft" # 草稿 — 待审核
|
||
STATUS_APPROVED = "approved" # 已审核 — 待排期或立即可发布
|
||
STATUS_SCHEDULED = "scheduled" # 已排期 — 定时发布
|
||
STATUS_PUBLISHING = "publishing" # 发布中
|
||
STATUS_PUBLISHED = "published" # 已发布
|
||
STATUS_FAILED = "failed" # 发布失败
|
||
STATUS_REJECTED = "rejected" # 已拒绝/丢弃
|
||
|
||
ALL_STATUSES = [STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED,
|
||
STATUS_PUBLISHING, STATUS_PUBLISHED, STATUS_FAILED, STATUS_REJECTED]
|
||
|
||
STATUS_LABELS = {
|
||
STATUS_DRAFT: "📝 草稿",
|
||
STATUS_APPROVED: "✅ 待发布",
|
||
STATUS_SCHEDULED: "🕐 已排期",
|
||
STATUS_PUBLISHING: "🚀 发布中",
|
||
STATUS_PUBLISHED: "✅ 已发布",
|
||
STATUS_FAILED: "❌ 失败",
|
||
STATUS_REJECTED: "🚫 已拒绝",
|
||
}
|
||
|
||
MAX_RETRIES = 2
|
||
|
||
|
||
class PublishQueue:
|
||
"""发布队列管理器 (SQLite 持久化)"""
|
||
|
||
def __init__(self, workspace_dir: str):
|
||
self.db_path = os.path.join(workspace_dir, "publish_queue.db")
|
||
os.makedirs(workspace_dir, exist_ok=True)
|
||
self._init_db()
|
||
# 发布中状态恢复 (启动时把 publishing → failed)
|
||
self._recover_stale()
|
||
|
||
def _get_conn(self) -> sqlite3.Connection:
|
||
conn = sqlite3.connect(self.db_path, timeout=10)
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
return conn
|
||
|
||
def _init_db(self):
|
||
"""初始化数据库表"""
|
||
conn = self._get_conn()
|
||
try:
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS queue (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
content TEXT NOT NULL DEFAULT '',
|
||
sd_prompt TEXT DEFAULT '',
|
||
tags TEXT DEFAULT '[]',
|
||
image_paths TEXT DEFAULT '[]',
|
||
backup_dir TEXT DEFAULT '',
|
||
status TEXT NOT NULL DEFAULT 'draft',
|
||
scheduled_time TEXT,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
published_at TEXT,
|
||
topic TEXT DEFAULT '',
|
||
style TEXT DEFAULT '',
|
||
persona TEXT DEFAULT '',
|
||
error_message TEXT DEFAULT '',
|
||
retry_count INTEGER DEFAULT 0,
|
||
publish_result TEXT DEFAULT ''
|
||
)
|
||
""")
|
||
conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status)
|
||
""")
|
||
conn.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_queue_scheduled ON queue(scheduled_time)
|
||
""")
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
def _recover_stale(self):
|
||
"""启动时将残留的 publishing 状态恢复为 failed"""
|
||
conn = self._get_conn()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE queue SET status = ?, error_message = '程序重启,发布中断' "
|
||
"WHERE status = ?",
|
||
(STATUS_FAILED, STATUS_PUBLISHING),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
# ---------- CRUD ----------
|
||
|
||
def add(self, title: str, content: str, sd_prompt: str = "",
|
||
tags: list = None, image_paths: list = None,
|
||
backup_dir: str = "", topic: str = "", style: str = "",
|
||
persona: str = "", status: str = STATUS_DRAFT,
|
||
scheduled_time: str = None) -> int:
|
||
"""添加一个队列项,返回 ID"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn = self._get_conn()
|
||
try:
|
||
cur = conn.execute(
|
||
"""INSERT INTO queue (title, content, sd_prompt, tags, image_paths,
|
||
backup_dir, status, scheduled_time, created_at, updated_at,
|
||
topic, style, persona)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||
(title, content, sd_prompt,
|
||
json.dumps(tags or [], ensure_ascii=False),
|
||
json.dumps(image_paths or [], ensure_ascii=False),
|
||
backup_dir, status, scheduled_time, now, now,
|
||
topic, style, persona),
|
||
)
|
||
conn.commit()
|
||
item_id = cur.lastrowid
|
||
logger.info("📋 队列添加 #%d: %s [%s]", item_id, title[:20], status)
|
||
return item_id
|
||
finally:
|
||
conn.close()
|
||
|
||
def get(self, item_id: int) -> Optional[dict]:
|
||
"""获取单个队列项"""
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT * FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
return self._row_to_dict(row) if row else None
|
||
finally:
|
||
conn.close()
|
||
|
||
def update_status(self, item_id: int, status: str,
|
||
error_message: str = "", publish_result: str = ""):
|
||
"""更新状态"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn = self._get_conn()
|
||
try:
|
||
fields = "status = ?, updated_at = ?"
|
||
params = [status, now]
|
||
if error_message:
|
||
fields += ", error_message = ?"
|
||
params.append(error_message)
|
||
if publish_result:
|
||
fields += ", publish_result = ?"
|
||
params.append(publish_result)
|
||
if status == STATUS_PUBLISHED:
|
||
fields += ", published_at = ?"
|
||
params.append(now)
|
||
params.append(item_id)
|
||
conn.execute(f"UPDATE queue SET {fields} WHERE id = ?", params)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
def update_content(self, item_id: int, title: str = None, content: str = None,
|
||
sd_prompt: str = None, tags: list = None,
|
||
scheduled_time: str = None):
|
||
"""更新内容 (仅 draft/approved/scheduled/failed 状态可编辑)"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
if not row or row["status"] in (STATUS_PUBLISHING, STATUS_PUBLISHED):
|
||
return False
|
||
sets, params = ["updated_at = ?"], [now]
|
||
if title is not None:
|
||
sets.append("title = ?"); params.append(title)
|
||
if content is not None:
|
||
sets.append("content = ?"); params.append(content)
|
||
if sd_prompt is not None:
|
||
sets.append("sd_prompt = ?"); params.append(sd_prompt)
|
||
if tags is not None:
|
||
sets.append("tags = ?"); params.append(json.dumps(tags, ensure_ascii=False))
|
||
if scheduled_time is not None:
|
||
sets.append("scheduled_time = ?"); params.append(scheduled_time)
|
||
params.append(item_id)
|
||
conn.execute(f"UPDATE queue SET {', '.join(sets)} WHERE id = ?", params)
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
def delete(self, item_id: int) -> bool:
|
||
"""删除队列项 (仅非 publishing 状态可删)"""
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
if not row or row["status"] == STATUS_PUBLISHING:
|
||
return False
|
||
conn.execute("DELETE FROM queue WHERE id = ?", (item_id,))
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
def approve(self, item_id: int, scheduled_time: str = None) -> bool:
|
||
"""审核通过 → 进入待发布或排期"""
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
if not row or row["status"] not in (STATUS_DRAFT, STATUS_FAILED):
|
||
return False
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
new_status = STATUS_SCHEDULED if scheduled_time else STATUS_APPROVED
|
||
conn.execute(
|
||
"UPDATE queue SET status = ?, scheduled_time = ?, updated_at = ?, "
|
||
"error_message = '', retry_count = 0 WHERE id = ?",
|
||
(new_status, scheduled_time, now, item_id),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
def reject(self, item_id: int) -> bool:
|
||
"""拒绝/丢弃"""
|
||
return self._set_status_if(item_id, STATUS_REJECTED,
|
||
allowed_from=[STATUS_DRAFT, STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED])
|
||
|
||
def retry(self, item_id: int) -> bool:
|
||
"""失败项重试"""
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT status, retry_count FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
if not row or row["status"] != STATUS_FAILED:
|
||
return False
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn.execute(
|
||
"UPDATE queue SET status = ?, updated_at = ?, error_message = '' WHERE id = ?",
|
||
(STATUS_APPROVED, now, item_id),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
def _set_status_if(self, item_id: int, new_status: str, allowed_from: list) -> bool:
|
||
conn = self._get_conn()
|
||
try:
|
||
row = conn.execute("SELECT status FROM queue WHERE id = ?", (item_id,)).fetchone()
|
||
if not row or row["status"] not in allowed_from:
|
||
return False
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn.execute("UPDATE queue SET status = ?, updated_at = ? WHERE id = ?",
|
||
(new_status, now, item_id))
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
# ---------- 查询 ----------
|
||
|
||
def list_by_status(self, statuses: list = None, limit: int = 50) -> list[dict]:
|
||
"""按状态查询队列项"""
|
||
conn = self._get_conn()
|
||
try:
|
||
if statuses:
|
||
placeholders = ",".join("?" * len(statuses))
|
||
rows = conn.execute(
|
||
f"SELECT * FROM queue WHERE status IN ({placeholders}) "
|
||
"ORDER BY created_at DESC LIMIT ?",
|
||
statuses + [limit],
|
||
).fetchall()
|
||
else:
|
||
rows = conn.execute(
|
||
"SELECT * FROM queue ORDER BY created_at DESC LIMIT ?", (limit,)
|
||
).fetchall()
|
||
return [self._row_to_dict(r) for r in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_pending_publish(self) -> list[dict]:
|
||
"""获取待发布项: approved 或 scheduled 且已到时间"""
|
||
conn = self._get_conn()
|
||
try:
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
rows = conn.execute(
|
||
"""SELECT * FROM queue
|
||
WHERE (status = ? OR (status = ? AND scheduled_time <= ?))
|
||
ORDER BY
|
||
CASE WHEN scheduled_time IS NOT NULL THEN scheduled_time
|
||
ELSE created_at END ASC
|
||
LIMIT 10""",
|
||
(STATUS_APPROVED, STATUS_SCHEDULED, now),
|
||
).fetchall()
|
||
return [self._row_to_dict(r) for r in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
def count_by_status(self) -> dict:
|
||
"""统计各状态数量"""
|
||
conn = self._get_conn()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT status, COUNT(*) as cnt FROM queue GROUP BY status"
|
||
).fetchall()
|
||
return {r["status"]: r["cnt"] for r in rows}
|
||
finally:
|
||
conn.close()
|
||
|
||
def get_calendar_data(self, days: int = 30) -> list[dict]:
|
||
"""获取日历数据 (最近 N 天的发布/排期概览)"""
|
||
conn = self._get_conn()
|
||
try:
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
|
||
rows = conn.execute(
|
||
"""SELECT id, title, status, scheduled_time, published_at, created_at
|
||
FROM queue
|
||
WHERE created_at >= ? OR scheduled_time >= ? OR published_at >= ?
|
||
ORDER BY COALESCE(scheduled_time, published_at, created_at) ASC""",
|
||
(cutoff, cutoff, cutoff),
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
# ---------- 辅助 ----------
|
||
|
||
@staticmethod
|
||
def _row_to_dict(row: sqlite3.Row) -> dict:
|
||
"""Row → dict, 并解析 JSON 字段"""
|
||
d = dict(row)
|
||
for key in ("tags", "image_paths"):
|
||
if key in d and isinstance(d[key], str):
|
||
try:
|
||
d[key] = json.loads(d[key])
|
||
except json.JSONDecodeError:
|
||
d[key] = []
|
||
return d
|
||
|
||
def format_queue_table(self, statuses: list = None, limit: int = 30) -> str:
|
||
"""生成 Markdown 格式的队列表格"""
|
||
items = self.list_by_status(statuses, limit)
|
||
if not items:
|
||
return "📭 队列为空"
|
||
|
||
lines = ["| # | 状态 | 标题 | 主题 | 排期时间 | 创建时间 |",
|
||
"|---|------|------|------|----------|----------|"]
|
||
for item in items:
|
||
status_label = STATUS_LABELS.get(item["status"], item["status"])
|
||
sched = item.get("scheduled_time") or "—"
|
||
if sched != "—":
|
||
sched = sched[:16] # 去掉秒
|
||
created = item["created_at"][:16] if item.get("created_at") else "—"
|
||
title_short = (item.get("title") or "")[:18]
|
||
topic_short = (item.get("topic") or "")[:10]
|
||
lines.append(f"| {item['id']} | {status_label} | {title_short} | {topic_short} | {sched} | {created} |")
|
||
|
||
# 统计摘要
|
||
counts = self.count_by_status()
|
||
summary_parts = []
|
||
for s, label in STATUS_LABELS.items():
|
||
cnt = counts.get(s, 0)
|
||
if cnt > 0:
|
||
summary_parts.append(f"{label}: {cnt}")
|
||
summary = " · ".join(summary_parts) if summary_parts else "全部为空"
|
||
|
||
return f"**队列统计**: {summary}\n\n" + "\n".join(lines)
|
||
|
||
def format_calendar(self, days: int = 14) -> str:
|
||
"""生成简易日历视图 (Markdown)"""
|
||
data = self.get_calendar_data(days)
|
||
if not data:
|
||
return "📅 暂无排期数据"
|
||
|
||
# 按日期分组
|
||
by_date = {}
|
||
for item in data:
|
||
# 优先用排期时间,其次发布时间,最后创建时间
|
||
dt_str = item.get("scheduled_time") or item.get("published_at") or item["created_at"]
|
||
date_key = dt_str[:10] if dt_str else "未知"
|
||
by_date.setdefault(date_key, []).append(item)
|
||
|
||
lines = ["### 📅 内容日历 (近 %d 天)\n" % days]
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
for date_key in sorted(by_date.keys()):
|
||
marker = " 📌 **今天**" if date_key == today else ""
|
||
lines.append(f"**{date_key}**{marker}")
|
||
for item in by_date[date_key]:
|
||
status_icon = STATUS_LABELS.get(item["status"], "❓")
|
||
time_part = ""
|
||
if item.get("scheduled_time"):
|
||
time_part = f" ⏰{item['scheduled_time'][11:16]}"
|
||
elif item.get("published_at"):
|
||
time_part = f" ✅{item['published_at'][11:16]}"
|
||
title_short = (item.get("title") or "无标题")[:20]
|
||
lines.append(f" - {status_icon} #{item['id']} {title_short}{time_part}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def format_preview(self, item_id: int) -> str:
|
||
"""生成单个项目的详细预览 (Markdown)"""
|
||
item = self.get(item_id)
|
||
if not item:
|
||
return "❌ 未找到该队列项"
|
||
|
||
status_label = STATUS_LABELS.get(item["status"], item["status"])
|
||
tags = item.get("tags", [])
|
||
tags_str = " ".join(f"#{t}" for t in tags) if tags else "无标签"
|
||
images = item.get("image_paths", [])
|
||
img_count = len(images) if images else 0
|
||
|
||
lines = [
|
||
f"## {status_label} #{item['id']}",
|
||
f"### 📌 {item.get('title', '无标题')}",
|
||
"",
|
||
item.get("content", "无正文"),
|
||
"",
|
||
f"---",
|
||
f"**主题**: {item.get('topic', '—')} · **风格**: {item.get('style', '—')}",
|
||
f"**标签**: {tags_str}",
|
||
f"**图片**: {img_count} 张",
|
||
f"**人设**: {(item.get('persona') or '—')[:30]}",
|
||
]
|
||
|
||
if item.get("scheduled_time"):
|
||
lines.append(f"**排期**: {item['scheduled_time']}")
|
||
if item.get("error_message"):
|
||
lines.append(f"**错误**: ❌ {item['error_message']}")
|
||
if item.get("backup_dir"):
|
||
lines.append(f"**备份**: `{item['backup_dir']}`")
|
||
if item.get("sd_prompt"):
|
||
lines.append(f"**SD提示词**: {item['sd_prompt'][:100]}{'...' if len(item.get('sd_prompt', '')) > 100 else ''}")
|
||
|
||
lines.extend([
|
||
f"**创建**: {item.get('created_at', '—')}",
|
||
f"**更新**: {item.get('updated_at', '—')}",
|
||
])
|
||
if item.get("published_at"):
|
||
lines.append(f"**发布**: {item['published_at']}")
|
||
|
||
# 展示图片(如有)
|
||
if images:
|
||
lines.append("\n### 📷 生成的图片\n")
|
||
for i, img in enumerate(images):
|
||
# 检查图片是否存在
|
||
exists = os.path.exists(img)
|
||
status_icon = "✅" if exists else "❌"
|
||
|
||
# 转换为绝对路径
|
||
abs_path = os.path.abspath(img) if exists else img
|
||
|
||
lines.append(f"{i+1}. {status_icon} `{abs_path}`")
|
||
|
||
# 如果图片存在,将图片转换为 base64 编码嵌入
|
||
if exists:
|
||
try:
|
||
# 读取图片并转换为 base64
|
||
with open(abs_path, 'rb') as f:
|
||
img_data = f.read()
|
||
img_base64 = base64.b64encode(img_data).decode('utf-8')
|
||
|
||
# 根据文件扩展名确定 MIME 类型
|
||
ext = os.path.splitext(abs_path)[1].lower()
|
||
mime_types = {
|
||
'.jpg': 'image/jpeg',
|
||
'.jpeg': 'image/jpeg',
|
||
'.png': 'image/png',
|
||
'.gif': 'image/gif',
|
||
'.webp': 'image/webp',
|
||
'.bmp': 'image/bmp'
|
||
}
|
||
mime_type = mime_types.get(ext, 'image/jpeg')
|
||
|
||
# 使用 data URI 格式嵌入图片
|
||
data_uri = f"data:{mime_type};base64,{img_base64}"
|
||
lines.append(f" ")
|
||
lines.append(f" <small>图片大小: {len(img_data) / 1024:.1f} KB</small>")
|
||
except Exception as e:
|
||
lines.append(f" ⚠️ 无法读取图片: {e}")
|
||
else:
|
||
lines.append(f" ⚠️ 图片文件不存在")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
# ---------- 智能排期引擎 ----------
|
||
|
||
def get_slot_usage(self, days: int = 7) -> dict:
|
||
"""查询未来 N 天各日期各时段已排期的数量。
|
||
|
||
返回: {"2026-02-28": {"18-21时": 1, "08-11时": 2}, ...}
|
||
"""
|
||
conn = self._get_conn()
|
||
try:
|
||
now = datetime.now()
|
||
cutoff = (now + timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
|
||
rows = conn.execute(
|
||
"SELECT scheduled_time FROM queue "
|
||
"WHERE status IN (?, ?) AND scheduled_time IS NOT NULL AND scheduled_time >= ? AND scheduled_time <= ?",
|
||
(STATUS_SCHEDULED, STATUS_APPROVED, now.strftime("%Y-%m-%d %H:%M:%S"), cutoff),
|
||
).fetchall()
|
||
|
||
usage: dict[str, dict[str, int]] = {}
|
||
for row in rows:
|
||
st = row["scheduled_time"]
|
||
if not st:
|
||
continue
|
||
try:
|
||
dt = datetime.strptime(st[:19], "%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, TypeError):
|
||
continue
|
||
date_key = dt.strftime("%Y-%m-%d")
|
||
hour = dt.hour
|
||
# 映射到3小时段
|
||
slot = self._hour_to_slot(hour)
|
||
usage.setdefault(date_key, {})
|
||
usage[date_key][slot] = usage[date_key].get(slot, 0) + 1
|
||
return usage
|
||
finally:
|
||
conn.close()
|
||
|
||
@staticmethod
|
||
def _hour_to_slot(hour: int) -> str:
|
||
"""将小时映射到时段标签。"""
|
||
brackets = [
|
||
(0, 3, "00-03时"), (3, 6, "03-06时"), (6, 8, "06-08时"),
|
||
(8, 11, "08-11时"), (11, 12, "11-12时"), (12, 14, "12-14时"),
|
||
(14, 18, "14-18时"), (18, 21, "18-21时"), (21, 24, "21-24时"),
|
||
]
|
||
for lo, hi, label in brackets:
|
||
if lo <= hour < hi:
|
||
return label
|
||
return "21-24时"
|
||
|
||
@staticmethod
|
||
def _slot_to_hour_range(slot: str) -> tuple[int, int]:
|
||
"""从时段标签提取起止小时 (start, end)。"""
|
||
import re as _re
|
||
m = _re.match(r"(\d{2})-(\d{2})时", slot)
|
||
if m:
|
||
return int(m.group(1)), int(m.group(2))
|
||
return 18, 21 # fallback
|
||
|
||
def suggest_schedule_time(self, analytics, max_per_slot: int = 2,
|
||
max_per_day: int = 5) -> str | None:
|
||
"""基于时段权重和已有排期,计算最优发布时间。
|
||
|
||
返回格式: '%Y-%m-%d %H:%M:%S',所有时段满时返回 None。
|
||
"""
|
||
import random as _random
|
||
|
||
time_weights = analytics.get_time_weights()
|
||
if not time_weights:
|
||
return None
|
||
|
||
# 按权重降序排列候选时段
|
||
sorted_slots = sorted(time_weights.items(),
|
||
key=lambda x: x[1] if isinstance(x[1], (int, float)) else x[1].get("weight", 0),
|
||
reverse=True)
|
||
|
||
usage = self.get_slot_usage(days=7)
|
||
now = datetime.now()
|
||
|
||
for day_offset in range(8): # 今天 + 未来7天
|
||
target_date = now + timedelta(days=day_offset)
|
||
date_key = target_date.strftime("%Y-%m-%d")
|
||
|
||
# 检查当天总量
|
||
day_usage = usage.get(date_key, {})
|
||
day_total = sum(day_usage.values())
|
||
if day_total >= max_per_day:
|
||
continue
|
||
|
||
for slot_name, slot_info in sorted_slots:
|
||
slot_count = day_usage.get(slot_name, 0)
|
||
if slot_count >= max_per_slot:
|
||
continue
|
||
|
||
start_hour, end_hour = self._slot_to_hour_range(slot_name)
|
||
|
||
# 如果是今天,跳过已过去的时段
|
||
if day_offset == 0 and end_hour <= now.hour:
|
||
continue
|
||
# 如果是今天且时段正在进行中,起始小时调整为当前 +1
|
||
effective_start = start_hour
|
||
if day_offset == 0 and start_hour <= now.hour < end_hour:
|
||
effective_start = now.hour + 1
|
||
if effective_start >= end_hour:
|
||
continue
|
||
|
||
# 在时段内随机选一个时间
|
||
rand_hour = _random.randint(effective_start, end_hour - 1)
|
||
rand_minute = _random.randint(0, 59)
|
||
scheduled = target_date.replace(hour=rand_hour, minute=rand_minute,
|
||
second=0, microsecond=0)
|
||
return scheduled.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
return None
|
||
|
||
def auto_schedule_item(self, item_id: int, analytics,
|
||
max_per_slot: int = 2, max_per_day: int = 5) -> bool:
|
||
"""为指定队列项自动分配排期时间。
|
||
|
||
成功返回 True(状态变为 scheduled),无可用时段返回 False。
|
||
"""
|
||
item = self.get(item_id)
|
||
if not item or item["status"] not in (STATUS_DRAFT, STATUS_APPROVED):
|
||
return False
|
||
|
||
scheduled_time = self.suggest_schedule_time(
|
||
analytics, max_per_slot=max_per_slot, max_per_day=max_per_day,
|
||
)
|
||
if not scheduled_time:
|
||
logger.warning("auto_schedule_item #%d: 未来7天无可用时段", item_id)
|
||
return False
|
||
|
||
# 更新排期时间 + 状态
|
||
conn = self._get_conn()
|
||
try:
|
||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
conn.execute(
|
||
"UPDATE queue SET status = ?, scheduled_time = ?, updated_at = ? WHERE id = ?",
|
||
(STATUS_SCHEDULED, scheduled_time, now_str, item_id),
|
||
)
|
||
conn.commit()
|
||
logger.info("auto_schedule_item #%d → %s", item_id, scheduled_time)
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
class QueuePublisher:
|
||
"""后台队列发布处理器"""
|
||
|
||
def __init__(self, queue: PublishQueue):
|
||
self.queue = queue
|
||
self._running = threading.Event()
|
||
self._thread = None
|
||
self._publish_fn = None # 由外部注册的发布回调
|
||
self._log_fn = None # 日志回调
|
||
|
||
def set_publish_callback(self, fn):
|
||
"""注册发布回调: fn(item: dict) -> (success: bool, message: str)"""
|
||
self._publish_fn = fn
|
||
|
||
def set_log_callback(self, fn):
|
||
"""注册日志回调: fn(msg: str)"""
|
||
self._log_fn = fn
|
||
|
||
def _log(self, msg: str):
|
||
logger.info(msg)
|
||
if self._log_fn:
|
||
try:
|
||
self._log_fn(msg)
|
||
except Exception:
|
||
pass
|
||
|
||
def start(self, check_interval: int = 60):
|
||
"""启动后台队列处理"""
|
||
if self._running.is_set():
|
||
return
|
||
self._running.set()
|
||
self._thread = threading.Thread(
|
||
target=self._loop, args=(check_interval,), daemon=True
|
||
)
|
||
self._thread.start()
|
||
self._log("📋 发布队列处理器已启动")
|
||
|
||
def stop(self):
|
||
"""停止队列处理"""
|
||
self._running.clear()
|
||
self._log("📋 发布队列处理器已停止")
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
return self._running.is_set()
|
||
|
||
def _loop(self, interval: int):
|
||
while self._running.is_set():
|
||
try:
|
||
self._process_pending()
|
||
except Exception as e:
|
||
self._log(f"❌ 队列处理异常: {e}")
|
||
logger.error("队列处理异常: %s", e, exc_info=True)
|
||
# 等待,但可中断
|
||
for _ in range(interval):
|
||
if not self._running.is_set():
|
||
break
|
||
time.sleep(1)
|
||
|
||
def _process_pending(self):
|
||
"""处理所有待发布项"""
|
||
if not self._publish_fn:
|
||
return
|
||
|
||
pending = self.queue.get_pending_publish()
|
||
if not pending:
|
||
return
|
||
|
||
for item in pending:
|
||
if not self._running.is_set():
|
||
break
|
||
|
||
item_id = item["id"]
|
||
title = item.get("title", "")[:20]
|
||
self._log(f"📋 队列发布 #{item_id}: {title}")
|
||
|
||
# 标记为发布中
|
||
self.queue.update_status(item_id, STATUS_PUBLISHING)
|
||
|
||
try:
|
||
success, message = self._publish_fn(item)
|
||
if success:
|
||
self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message)
|
||
self._log(f"✅ 队列发布成功 #{item_id}: {title}")
|
||
else:
|
||
retry_count = item.get("retry_count", 0) + 1
|
||
if retry_count <= MAX_RETRIES:
|
||
# 还有重试机会 → approved 状态等下一轮
|
||
self.queue.update_status(item_id, STATUS_APPROVED, error_message=f"第{retry_count}次失败: {message}")
|
||
conn = self.queue._get_conn()
|
||
try:
|
||
conn.execute("UPDATE queue SET retry_count = ? WHERE id = ?",
|
||
(retry_count, item_id))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
self._log(f"⚠️ #{item_id} 发布失败 (重试 {retry_count}/{MAX_RETRIES}): {message}")
|
||
else:
|
||
self.queue.update_status(item_id, STATUS_FAILED, error_message=message)
|
||
self._log(f"❌ #{item_id} 发布失败已达重试上限: {message}")
|
||
|
||
except Exception as e:
|
||
self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e))
|
||
self._log(f"❌ #{item_id} 发布异常: {e}")
|
||
|
||
# 发布间隔 (模拟真人)
|
||
import random
|
||
wait = random.randint(5, 15)
|
||
self._log(f"⏳ 等待 {wait}s 后处理下一项...")
|
||
for _ in range(wait):
|
||
if not self._running.is_set():
|
||
break
|
||
time.sleep(1)
|
||
|
||
def publish_now(self, item_id: int) -> str:
|
||
"""立即发布指定项 (不经过后台循环)"""
|
||
if not self._publish_fn:
|
||
return "❌ 发布回调未注册"
|
||
|
||
item = self.queue.get(item_id)
|
||
if not item:
|
||
return "❌ 未找到队列项"
|
||
if item["status"] not in (STATUS_APPROVED, STATUS_SCHEDULED, STATUS_FAILED):
|
||
return f"❌ 当前状态 [{STATUS_LABELS.get(item['status'], item['status'])}] 不可发布"
|
||
|
||
self.queue.update_status(item_id, STATUS_PUBLISHING)
|
||
try:
|
||
success, message = self._publish_fn(item)
|
||
if success:
|
||
self.queue.update_status(item_id, STATUS_PUBLISHED, publish_result=message)
|
||
return f"✅ 发布成功: {message}"
|
||
else:
|
||
self.queue.update_status(item_id, STATUS_FAILED, error_message=message)
|
||
return f"❌ 发布失败: {message}"
|
||
except Exception as e:
|
||
self.queue.update_status(item_id, STATUS_FAILED, error_message=str(e))
|
||
return f"❌ 发布异常: {e}"
|