feat(automation): 新增自动化运营防重复机制与统计功能

- 新增操作历史记录,防止对同一笔记重复评论、点赞、收藏和回复
- 新增每日操作统计与限额管理,包含评论、点赞、收藏、发布和回复的独立上限
- 新增错误冷却机制,连续错误后自动暂停操作一段时间
- 新增运营时段控制,允许设置每日自动运营的开始和结束时间
- 新增收藏功能,支持一键收藏和定时自动收藏
- 新增随机人设池,提供25种预设小红书博主风格人设,支持随机切换
- 扩充主题池、风格池和评论关键词池,增加运营多样性
- 优化自动化调度器,显示下次执行时间和实时统计摘要
- 优化发布功能,增加本地备份机制,失败时保留文案和图片

🐛 fix(llm): 修复绘图提示词中的人物特征要求

- 在绘图提示词模板中明确要求人物必须是东亚面孔的中国人
- 添加具体的人物特征描述,如黑发、深棕色眼睛、精致五官等
- 禁止出现西方人或欧美人特征
- 调整整体画面风格偏向东方审美、清新淡雅和小红书风格
This commit is contained in:
zhoujie 2026-02-09 20:50:05 +08:00
parent 087d23f3fb
commit dbe695b551
3 changed files with 578 additions and 79 deletions

View File

@ -27,11 +27,13 @@ PROMPT_COPYWRITING = """
绘图 Prompt
生成对应的 Stable Diffusion 英文提示词适配 JuggernautXL 模型强调
- 人物要求最重要如果画面中有人物必须是东亚面孔的中国人使用 asian girl/boy, chinese, east asian features, black hair, dark brown eyes, delicate facial features, fair skin, slim figure 等描述绝对禁止出现西方人/欧美人特征
- 质量词masterpiece, best quality, ultra detailed, 8k uhd, high resolution
- 光影natural lighting, soft shadows, studio lighting, golden hour 根据场景选择
- 风格photorealistic, cinematic, editorial photography, ins style
- 风格photorealistic, cinematic, editorial photography, ins style, chinese social media aesthetic
- 构图dynamic angle, depth of field, bokeh
- 细节detailed skin texture, sharp focus, vivid colors
- 审美偏向整体画面风格偏向东方审美清新淡雅小红书风格
注意不要使用括号权重语法直接用英文逗号分隔描述
返回 JSON 格式
@ -112,9 +114,11 @@ PROMPT_COPY_WITH_REFERENCE = """
绘图 Prompt
生成 Stable Diffusion 英文提示词适配 JuggernautXL 模型
- 人物要求最重要如果画面中有人物必须是东亚面孔的中国人使用 asian girl/boy, chinese, east asian features, black hair, dark brown eyes, delicate facial features, fair skin, slim figure 等描述绝对禁止出现西方人/欧美人特征
- 必含质量词masterpiece, best quality, ultra detailed, 8k uhd
- 风格photorealistic, cinematic, editorial photography
- 风格photorealistic, cinematic, editorial photography, chinese social media aesthetic
- 光影和细节natural lighting, sharp focus, vivid colors, detailed skin texture
- 审美偏向整体画面风格偏向东方审美清新淡雅小红书风格
- 用英文逗号分隔不用括号权重语法
返回 JSON 格式

625
main.py
View File

@ -576,6 +576,7 @@ def load_note_for_comment(feed_id, xsec_token, mcp_url):
def ai_generate_comment(model, persona,
post_title, post_content, existing_comments):
"""AI 生成主动评论"""
persona = _resolve_persona(persona)
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
@ -703,6 +704,7 @@ def fetch_my_note_comments(feed_id, xsec_token, mcp_url):
def ai_reply_comment(model, persona, post_title, comment_text):
"""AI 生成评论回复"""
persona = _resolve_persona(persona)
api_key, base_url, _ = _get_llm_config()
if not api_key:
return "⚠️ 请先配置 LLM 提供商", "❌ LLM 未配置"
@ -893,16 +895,195 @@ _auto_running = threading.Event()
_auto_thread: threading.Thread | None = None
_auto_log: list[str] = []
DEFAULT_TOPICS = [
"春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "平价好物",
"护肤心得", "妆容教程", "好物分享", "生活好物", "减脂餐分享",
"居家好物", "收纳技巧", "咖啡探店", "书单推荐", "旅行攻略",
# ---- 操作记录:防重复 & 每日统计 ----
_op_history = {
"commented_feeds": set(), # 已评论的 feed_id
"replied_comments": set(), # 已回复的 comment_id
"liked_feeds": set(), # 已点赞的 feed_id
"favorited_feeds": set(), # 已收藏的 feed_id
}
_daily_stats = {
"date": "",
"comments": 0,
"likes": 0,
"favorites": 0,
"publishes": 0,
"replies": 0,
"errors": 0,
}
# 每日操作上限
DAILY_LIMITS = {
"comments": 30,
"likes": 80,
"favorites": 50,
"publishes": 8,
"replies": 40,
}
# 连续错误计数 → 冷却
_consecutive_errors = 0
_error_cooldown_until = 0.0
def _reset_daily_stats_if_needed():
"""每天自动重置统计"""
today = datetime.now().strftime("%Y-%m-%d")
if _daily_stats["date"] != today:
_daily_stats.update({
"date": today, "comments": 0, "likes": 0,
"favorites": 0, "publishes": 0, "replies": 0, "errors": 0,
})
# 每日重置历史记录(允许隔天重复互动)
for k in _op_history:
_op_history[k].clear()
def _check_daily_limit(op_type: str) -> bool:
"""检查是否超出每日限额"""
_reset_daily_stats_if_needed()
limit = DAILY_LIMITS.get(op_type, 999)
current = _daily_stats.get(op_type, 0)
return current < limit
def _increment_stat(op_type: str):
"""增加操作计数"""
_reset_daily_stats_if_needed()
_daily_stats[op_type] = _daily_stats.get(op_type, 0) + 1
def _record_error():
"""记录错误,连续错误触发冷却"""
global _consecutive_errors, _error_cooldown_until
_consecutive_errors += 1
_daily_stats["errors"] = _daily_stats.get("errors", 0) + 1
if _consecutive_errors >= 3:
cooldown = min(60 * _consecutive_errors, 600) # 最多冷却10分钟
_error_cooldown_until = time.time() + cooldown
_auto_log_append(f"⚠️ 连续 {_consecutive_errors} 次错误,冷却 {cooldown}s")
def _clear_error_streak():
"""操作成功后清除连续错误记录"""
global _consecutive_errors
_consecutive_errors = 0
def _is_in_cooldown() -> bool:
"""检查是否在错误冷却期"""
return time.time() < _error_cooldown_until
def _is_in_operating_hours(start_hour: int = 7, end_hour: int = 23) -> bool:
"""检查是否在运营时间段"""
now_hour = datetime.now().hour
return start_hour <= now_hour < end_hour
def _get_stats_summary() -> str:
"""获取今日运营统计摘要"""
_reset_daily_stats_if_needed()
s = _daily_stats
lines = [
f"📊 **今日运营统计** ({s['date']})",
f"- 💬 评论: {s['comments']}/{DAILY_LIMITS['comments']}",
f"- ❤️ 点赞: {s['likes']}/{DAILY_LIMITS['likes']}",
f"- ⭐ 收藏: {s['favorites']}/{DAILY_LIMITS['favorites']}",
f"- 🚀 发布: {s['publishes']}/{DAILY_LIMITS['publishes']}",
f"- 💌 回复: {s['replies']}/{DAILY_LIMITS['replies']}",
f"- ❌ 错误: {s['errors']}",
]
return "\n".join(lines)
# ================= 人设池 =================
DEFAULT_PERSONAS = [
"温柔知性的时尚博主,喜欢分享日常穿搭和生活美学",
"元气满满的大学生,热爱探店和平价好物分享",
"30岁都市白领丽人专注通勤穿搭和职场干货",
"精致妈妈,分享育儿经验和家居收纳技巧",
"文艺青年摄影师,喜欢记录旅行和城市角落",
"健身达人营养师,专注减脂餐和运动分享",
"资深美妆博主,擅长化妆教程和护肤测评",
"独居女孩,分享租房改造和独居生活仪式感",
"甜品烘焙爱好者,热衷分享自制甜点和下午茶",
"数码科技女生专注好用App和电子产品测评",
"小镇姑娘在大城市打拼,分享省钱攻略和成长日记",
"中医养生爱好者,分享节气养生和食疗方子",
"二次元coser喜欢分享cos日常和动漫周边",
"北漂程序媛,分享高效工作法和解压生活",
"复古穿搭博主热爱vintage风和中古饰品",
"考研上岸学姐,分享学习方法和备考经验",
"新手养猫人,记录和毛孩子的日常生活",
"咖啡重度爱好者,探遍城市独立咖啡馆",
"极简主义生活家,倡导断舍离和高质量生活",
"汉服爱好者,分享传统文化和国风穿搭",
"插画师小姐姐,分享手绘过程和创作灵感",
"海归女孩,分享中西文化差异和海外生活见闻",
"瑜伽老师,分享身心灵修行和自律生活",
"美甲设计师,分享流行甲型和美甲教程",
"家居软装设计师,分享小户型改造和氛围感布置",
]
DEFAULT_STYLES = ["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷"]
RANDOM_PERSONA_LABEL = "🎲 随机人设(每次自动切换)"
# ================= 主题池 =================
DEFAULT_TOPICS = [
# 穿搭类
"春季穿搭", "通勤穿搭", "约会穿搭", "显瘦穿搭", "小个子穿搭",
"学生党穿搭", "韩系穿搭", "日系穿搭", "法式穿搭", "极简穿搭",
"国风穿搭", "运动穿搭", "闺蜜穿搭", "梨形身材穿搭", "微胖穿搭",
"氛围感穿搭", "一衣多穿", "秋冬叠穿", "夏日清凉穿搭",
# 美妆护肤类
"护肤心得", "妆容教程", "学生党平价护肤", "敏感肌护肤",
"抗老护肤", "美白攻略", "眼妆教程", "唇妆合集", "底妆测评",
"防晒测评", "早C晚A护肤", "成分党护肤", "换季护肤",
# 美食类
"减脂餐分享", "一人食食谱", "宿舍美食", "烘焙教程", "家常菜做法",
"探店打卡", "咖啡探店", "早餐食谱", "下午茶推荐", "火锅推荐",
"奶茶测评", "便当制作", "0失败甜品",
# 生活家居类
"好物分享", "平价好物", "居家好物", "收纳技巧", "租房改造",
"小户型装修", "氛围感房间", "香薰推荐", "桌面布置", "断舍离",
# 旅行出行类
"旅行攻略", "周末去哪玩", "小众旅行地", "拍照打卡地", "露营攻略",
"自驾游攻略", "古镇旅行", "海岛度假", "城市citywalk",
# 学习成长类
"书单推荐", "自律生活", "时间管理", "考研经验", "英语学习方法",
"理财入门", "副业分享", "简历优化", "面试技巧",
# 数码科技类
"iPad生产力", "手机摄影技巧", "好用App推荐", "电子产品测评",
# 健身运动类
"居家健身", "帕梅拉跟练", "跑步入门", "瑜伽入门", "体态矫正",
# 宠物类
"养猫日常", "养狗经验", "宠物好物", "新手养宠指南",
# 情感心理类
"独居生活", "emo急救指南", "社恐自救", "女性成长", "情绪管理",
]
DEFAULT_STYLES = [
"好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷",
"知识科普", "经验分享", "清单合集", "对比测评", "沉浸式体验",
]
# ================= 评论关键词池 =================
DEFAULT_COMMENT_KEYWORDS = [
"穿搭", "美食", "护肤", "好物推荐", "旅行", "生活日常", "减脂",
# 穿搭时尚
"穿搭", "ootd", "早春穿搭", "通勤穿搭", "显瘦", "小个子穿搭",
# 美妆护肤
"护肤", "化妆教程", "平价护肤", "防晒", "美白", "眼影",
# 美食
"美食", "减脂餐", "探店", "咖啡", "烘焙", "食谱",
# 生活好物
"好物推荐", "平价好物", "居家好物", "收纳", "租房改造",
# 旅行
"旅行", "攻略", "打卡", "周末去哪玩", "露营",
# 学习成长
"自律", "书单", "考研", "英语学习", "副业",
# 生活日常
"生活日常", "独居", "vlog", "仪式感", "解压",
# 健身
"减脂", "健身", "瑜伽", "体态",
# 宠物
"养猫", "养狗", "宠物",
]
@ -916,6 +1097,16 @@ def _auto_log_append(msg: str):
logger.info("[自动化] %s", msg)
def _resolve_persona(persona_text: str) -> str:
"""解析人设:如果是随机人设则从池中随机选一个,否则原样返回"""
if not persona_text or persona_text == RANDOM_PERSONA_LABEL:
chosen = random.choice(DEFAULT_PERSONAS)
_auto_log_append(f"🎭 本次人设: {chosen[:20]}...")
return chosen
# 检查是否选的是池中某个人设Dropdown选中
return persona_text
def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text):
"""一键评论 + 同步刷新日志"""
msg = auto_comment_once(keywords_str, mcp_url, model, persona_text)
@ -923,45 +1114,60 @@ def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text):
def auto_comment_once(keywords_str, mcp_url, model, persona_text):
"""一键评论:自动搜索高赞笔记 → AI生成评论 → 发送"""
"""一键评论:自动搜索高赞笔记 → AI生成评论 → 发送(含防重复/限额/冷却)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("comments"):
return f"🚫 今日评论已达上限 ({DAILY_LIMITS['comments']})"
persona_text = _resolve_persona(persona_text)
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
_auto_log_append(f"🔍 搜索关键词: {keyword}")
client = get_mcp_client(mcp_url)
# 随机切换搜索排序,丰富互动对象
sort_options = ["最多点赞", "综合", "最新"]
sort_by = random.choice(sort_options)
# 搜索高赞笔记
entries = client.search_feeds_parsed(keyword, sort_by="最多点赞")
entries = client.search_feeds_parsed(keyword, sort_by=sort_by)
if not entries:
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
# 过滤掉自己的笔记
# 过滤掉自己的笔记 & 已评论过的笔记
my_uid = cfg.get("my_user_id", "")
if my_uid:
filtered = [e for e in entries if e.get("user_id") != my_uid]
if filtered:
entries = filtered
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["commented_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已评论过,换个关键词试试"
# 从前10个中随机选择
target = random.choice(entries[:min(10, len(entries))])
feed_id = target["feed_id"]
xsec_token = target["xsec_token"]
title = target.get("title", "未知")
_auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')})")
_auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')}) [排序:{sort_by}]")
if not feed_id or not xsec_token:
return "❌ 笔记缺少必要参数 (feed_id/xsec_token)"
# 模拟浏览延迟
time.sleep(random.uniform(2, 5))
time.sleep(random.uniform(3, 8))
# 加载笔记详情
result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True)
if "error" in result:
_record_error()
return f"❌ 加载笔记失败: {result['error']}"
full_text = result.get("text", "")
@ -985,24 +1191,26 @@ def auto_comment_once(keywords_str, mcp_url, model, persona_text):
_auto_log_append(f"💬 生成评论: {comment[:60]}...")
# 随机等待后发送
time.sleep(random.uniform(3, 8))
time.sleep(random.uniform(3, 10))
result = client.post_comment(feed_id, xsec_token, comment)
resp_text = result.get("text", "")
_auto_log_append(f"📡 MCP 响应: {resp_text[:200]}")
if "error" in result:
_record_error()
_auto_log_append(f"❌ 评论发送失败: {result['error']}")
return f"❌ 评论发送失败: {result['error']}"
# 检查是否真正成功
if "成功" not in resp_text and "success" not in resp_text.lower() and not resp_text:
_auto_log_append(f"⚠️ 评论可能未成功MCP 原始响应: {result}")
return f"⚠️ 评论状态不确定,请手动检查\nMCP 响应: {resp_text[:300]}\n📝 评论: {comment}"
# 记录成功操作
_op_history["commented_feeds"].add(feed_id)
_increment_stat("comments")
_clear_error_streak()
_auto_log_append(f"✅ 评论已发送到「{title[:20]}")
return f"✅ 已评论「{title[:25]}\n📝 评论: {comment}\n\n💡 小红书可能有内容审核延迟,请稍等 1-2 分钟后查看"
_auto_log_append(f"✅ 评论已发送到「{title[:20]} (今日第{_daily_stats['comments']}条)")
return f"✅ 已评论「{title[:25]}\n📝 评论: {comment}\n📊 今日评论: {_daily_stats['comments']}/{DAILY_LIMITS['comments']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键评论异常: {e}")
return f"❌ 评论失败: {e}"
@ -1014,11 +1222,19 @@ def _auto_like_with_log(keywords_str, like_count, mcp_url):
def auto_like_once(keywords_str, like_count, mcp_url):
"""一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞"""
"""一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("likes"):
return f"🚫 今日点赞已达上限 ({DAILY_LIMITS['likes']})"
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
like_count = int(like_count) if like_count else 5
# 不超过当日剩余额度
remaining = DAILY_LIMITS["likes"] - _daily_stats.get("likes", 0)
like_count = min(like_count, remaining)
_auto_log_append(f"👍 点赞关键词: {keyword} | 目标: {like_count}")
client = get_mcp_client(mcp_url)
@ -1029,14 +1245,18 @@ def auto_like_once(keywords_str, like_count, mcp_url):
_auto_log_append("⚠️ 搜索无结果,尝试推荐列表")
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
# 过滤自己的笔记
# 过滤自己的笔记 & 已点赞过的
my_uid = cfg.get("my_user_id", "")
if my_uid:
filtered = [e for e in entries if e.get("user_id") != my_uid]
if filtered:
entries = filtered
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["liked_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已点赞过"
# 随机打乱,取前 N 个
random.shuffle(entries)
@ -1059,16 +1279,94 @@ def auto_like_once(keywords_str, like_count, mcp_url):
_auto_log_append(f" ❌ 点赞失败「{title}」: {result['error']}")
else:
liked += 1
_op_history["liked_feeds"].add(feed_id)
_increment_stat("likes")
_auto_log_append(f" ❤️ 已点赞「{title}」@{target.get('author', '未知')}")
_auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)}")
return f"✅ 点赞完成!成功 {liked}/{len(targets)}"
if liked > 0:
_clear_error_streak()
_auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)} (今日累计{_daily_stats.get('likes', 0)})")
return f"✅ 点赞完成!成功 {liked}/{len(targets)}\n📊 今日点赞: {_daily_stats.get('likes', 0)}/{DAILY_LIMITS['likes']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键点赞异常: {e}")
return f"❌ 点赞失败: {e}"
def _auto_favorite_with_log(keywords_str, fav_count, mcp_url):
"""一键收藏 + 同步刷新日志"""
msg = auto_favorite_once(keywords_str, fav_count, mcp_url)
return msg, get_auto_log()
def auto_favorite_once(keywords_str, fav_count, mcp_url):
"""一键收藏:搜索优质笔记 → 随机选择 → 批量收藏(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("favorites"):
return f"🚫 今日收藏已达上限 ({DAILY_LIMITS['favorites']})"
keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS
keyword = random.choice(keywords)
fav_count = int(fav_count) if fav_count else 3
remaining = DAILY_LIMITS["favorites"] - _daily_stats.get("favorites", 0)
fav_count = min(fav_count, remaining)
_auto_log_append(f"⭐ 收藏关键词: {keyword} | 目标: {fav_count}")
client = get_mcp_client(mcp_url)
entries = client.search_feeds_parsed(keyword, sort_by="最多收藏")
if not entries:
entries = client.list_feeds_parsed()
if not entries:
_record_error()
return "❌ 未找到任何笔记"
my_uid = cfg.get("my_user_id", "")
entries = [
e for e in entries
if e.get("user_id") != my_uid
and e.get("feed_id") not in _op_history["favorited_feeds"]
]
if not entries:
return " 搜索结果中所有笔记都已收藏过"
random.shuffle(entries)
targets = entries[:min(fav_count, len(entries))]
saved = 0
for target in targets:
feed_id = target.get("feed_id", "")
xsec_token = target.get("xsec_token", "")
title = target.get("title", "未知")[:25]
if not feed_id or not xsec_token:
continue
time.sleep(random.uniform(2, 6))
result = client.favorite_feed(feed_id, xsec_token)
if "error" in result:
_auto_log_append(f" ❌ 收藏失败「{title}」: {result['error']}")
else:
saved += 1
_op_history["favorited_feeds"].add(feed_id)
_increment_stat("favorites")
_auto_log_append(f" ⭐ 已收藏「{title}」@{target.get('author', '未知')}")
if saved > 0:
_clear_error_streak()
_auto_log_append(f"⭐ 收藏完成: 成功 {saved}/{len(targets)} (今日累计{_daily_stats.get('favorites', 0)})")
return f"✅ 收藏完成!成功 {saved}/{len(targets)}\n📊 今日收藏: {_daily_stats.get('favorites', 0)}/{DAILY_LIMITS['favorites']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键收藏异常: {e}")
return f"❌ 收藏失败: {e}"
def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布 + 同步刷新日志"""
msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model)
@ -1082,8 +1380,14 @@ def _auto_reply_with_log(max_replies, mcp_url, model, persona_text):
def auto_reply_once(max_replies, mcp_url, model, persona_text):
"""一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送"""
"""一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送(含防重复/限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("replies"):
return f"🚫 今日回复已达上限 ({DAILY_LIMITS['replies']})"
persona_text = _resolve_persona(persona_text)
my_uid = cfg.get("my_user_id", "")
xsec = cfg.get("xsec_token", "")
if not my_uid:
@ -1096,6 +1400,8 @@ def auto_reply_once(max_replies, mcp_url, model, persona_text):
return "❌ LLM 未配置"
max_replies = int(max_replies) if max_replies else 3
remaining = DAILY_LIMITS["replies"] - _daily_stats.get("replies", 0)
max_replies = min(max_replies, remaining)
client = get_mcp_client(mcp_url)
_auto_log_append("💌 开始自动回复评论...")
@ -1169,10 +1475,11 @@ def auto_reply_once(max_replies, mcp_url, model, persona_text):
if not comments:
continue
# 过滤掉自己的评论,只回复他人
# 过滤掉自己的评论 & 已回复过的评论
other_comments = [
c for c in comments
if c.get("user_id") and c["user_id"] != my_uid and c.get("content")
and c.get("comment_id", "") not in _op_history["replied_comments"]
]
if not other_comments:
@ -1221,22 +1528,34 @@ def auto_reply_once(max_replies, mcp_url, model, persona_text):
else:
_auto_log_append(f" ✅ 已回复 @{nickname}")
total_replied += 1
if comment_id:
_op_history["replied_comments"].add(comment_id)
_increment_stat("replies")
if total_replied > 0:
_clear_error_streak()
if total_replied == 0:
_auto_log_append(" 没有找到需要回复的新评论")
return " 没有找到需要回复的新评论\n\n💡 可能所有评论都已回复过"
else:
_auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} 条评论")
return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n\n💡 小红书审核可能有延迟,请稍后查看"
_auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} (今日累计{_daily_stats.get('replies', 0)})")
return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n📊 今日回复: {_daily_stats.get('replies', 0)}/{DAILY_LIMITS['replies']}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 自动回复异常: {e}")
return f"❌ 自动回复失败: {e}"
def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model):
"""一键发布:自动生成文案 → 生成图片 → 发布到小红书"""
"""一键发布:自动生成文案 → 生成图片 → 本地备份 → 发布到小红书(含限额)"""
try:
if _is_in_cooldown():
return "⏳ 错误冷却中,请稍后再试"
if not _check_daily_limit("publishes"):
return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})"
topics = [t.strip() for t in topics_str.split(",") if t.strip()] if topics_str else DEFAULT_TOPICS
topic = random.choice(topics)
style = random.choice(DEFAULT_STYLES)
@ -1255,6 +1574,7 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model):
tags = data.get("tags", [])
if not title:
_record_error()
return "❌ 文案生成失败:无标题"
_auto_log_append(f"📄 文案: {title}")
@ -1265,58 +1585,128 @@ def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model):
sd_svc = SDService(sd_url_val)
images = sd_svc.txt2img(prompt=sd_prompt, model=sd_model_name)
if not images:
_record_error()
return "❌ 图片生成失败:没有返回图片"
_auto_log_append(f"🎨 已生成 {len(images)} 张图片")
# 保存图片到临时目录
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
os.makedirs(temp_dir, exist_ok=True)
image_paths = []
# 本地备份(同时用于发布)
ts = int(time.time())
safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:20]
backup_dir = os.path.join(OUTPUT_DIR, f"{ts}_{safe_title}")
os.makedirs(backup_dir, exist_ok=True)
# 保存文案
with open(os.path.join(backup_dir, "文案.txt"), "w", encoding="utf-8") as f:
f.write(f"标题: {title}\n风格: {style}\n主题: {topic}\n\n{content}\n\n标签: {', '.join(tags)}\n\nSD Prompt: {sd_prompt}")
image_paths = []
for idx, img in enumerate(images):
if isinstance(img, Image.Image):
path = os.path.abspath(os.path.join(temp_dir, f"auto_{ts}_{idx}.png"))
path = os.path.abspath(os.path.join(backup_dir, f"{idx+1}.png"))
img.save(path)
image_paths.append(path)
if not image_paths:
return "❌ 图片保存失败"
_auto_log_append(f"💾 本地已备份至: {backup_dir}")
# 发布到小红书
client = get_mcp_client(mcp_url)
result = client.publish_content(
title=title, content=content, images=image_paths, tags=tags
)
if "error" in result:
_auto_log_append(f"❌ 发布失败: {result['error']}")
return f"❌ 发布失败: {result['error']}"
_record_error()
_auto_log_append(f"❌ 发布失败: {result['error']} (文案已本地保存)")
return f"❌ 发布失败: {result['error']}\n💾 文案和图片已备份至: {backup_dir}"
_auto_log_append(f"🚀 发布成功: {title}")
return f"✅ 发布成功!\n📌 标题: {title}\n{result.get('text', '')}"
_increment_stat("publishes")
_clear_error_streak()
# 清理 _temp_publish 中的旧临时文件
temp_dir = os.path.join(OUTPUT_DIR, "_temp_publish")
try:
if os.path.exists(temp_dir):
for f in os.listdir(temp_dir):
fp = os.path.join(temp_dir, f)
if os.path.isfile(fp) and time.time() - os.path.getmtime(fp) > 3600:
os.remove(fp)
except Exception:
pass
_auto_log_append(f"🚀 发布成功: {title} (今日第{_daily_stats['publishes']}篇)")
return f"✅ 发布成功!\n📌 标题: {title}\n💾 备份: {backup_dir}\n📊 今日发布: {_daily_stats['publishes']}/{DAILY_LIMITS['publishes']}\n{result.get('text', '')}"
except Exception as e:
_record_error()
_auto_log_append(f"❌ 一键发布异常: {e}")
return f"❌ 发布失败: {e}"
# 调度器下次执行时间追踪
_scheduler_next_times = {}
def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enabled,
favorite_enabled,
comment_min, comment_max, publish_min, publish_max,
reply_min, reply_max, max_replies_per_run,
like_min, like_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""后台定时调度循环"""
"""后台定时调度循环(含运营时段、冷却、收藏、统计)"""
_auto_log_append("🤖 自动化调度器已启动")
_auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00")
# 首次执行的随机延迟
next_comment = time.time() + random.randint(10, 60)
next_publish = time.time() + random.randint(30, 120)
next_reply = time.time() + random.randint(15, 90)
next_like = time.time() + random.randint(5, 40)
next_favorite = time.time() + random.randint(10, 50)
def _update_next_display():
"""更新下次执行时间显示"""
times = {}
if comment_enabled:
times["评论"] = datetime.fromtimestamp(next_comment).strftime("%H:%M:%S")
if like_enabled:
times["点赞"] = datetime.fromtimestamp(next_like).strftime("%H:%M:%S")
if favorite_enabled:
times["收藏"] = datetime.fromtimestamp(next_favorite).strftime("%H:%M:%S")
if reply_enabled:
times["回复"] = datetime.fromtimestamp(next_reply).strftime("%H:%M:%S")
if publish_enabled:
times["发布"] = datetime.fromtimestamp(next_publish).strftime("%H:%M:%S")
_scheduler_next_times.update(times)
_update_next_display()
while _auto_running.is_set():
now = time.time()
# 检查运营时段
if not _is_in_operating_hours(int(op_start_hour), int(op_end_hour)):
now_hour = datetime.now().hour
_auto_log_append(f"😴 当前{now_hour}时,不在运营时段({int(op_start_hour)}-{int(op_end_hour)}),休眠中...")
# 休眠到运营时间开始
for _ in range(300): # 5分钟检查一次
if not _auto_running.is_set():
break
time.sleep(1)
continue
# 检查错误冷却
if _is_in_cooldown():
remain = int(_error_cooldown_until - time.time())
if remain > 0 and remain % 30 == 0:
_auto_log_append(f"⏳ 错误冷却中,剩余 {remain}s")
time.sleep(5)
continue
# 自动评论
if comment_enabled and now >= next_comment:
try:
@ -1328,6 +1718,7 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
interval = random.randint(int(comment_min) * 60, int(comment_max) * 60)
next_comment = time.time() + interval
_auto_log_append(f"⏰ 下次评论: {interval // 60} 分钟后")
_update_next_display()
# 自动点赞
if like_enabled and now >= next_like:
@ -1340,6 +1731,20 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
interval = random.randint(int(like_min) * 60, int(like_max) * 60)
next_like = time.time() + interval
_auto_log_append(f"⏰ 下次点赞: {interval // 60} 分钟后")
_update_next_display()
# 自动收藏
if favorite_enabled and now >= next_favorite:
try:
_auto_log_append("--- 🔄 执行自动收藏 ---")
msg = auto_favorite_once(keywords, fav_count_per_run, mcp_url)
_auto_log_append(msg)
except Exception as e:
_auto_log_append(f"❌ 自动收藏异常: {e}")
interval = random.randint(int(fav_min) * 60, int(fav_max) * 60)
next_favorite = time.time() + interval
_auto_log_append(f"⏰ 下次收藏: {interval // 60} 分钟后")
_update_next_display()
# 自动发布
if publish_enabled and now >= next_publish:
@ -1352,6 +1757,7 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
interval = random.randint(int(publish_min) * 60, int(publish_max) * 60)
next_publish = time.time() + interval
_auto_log_append(f"⏰ 下次发布: {interval // 60} 分钟后")
_update_next_display()
# 自动回复评论
if reply_enabled and now >= next_reply:
@ -1364,6 +1770,7 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
interval = random.randint(int(reply_min) * 60, int(reply_max) * 60)
next_reply = time.time() + interval
_auto_log_append(f"⏰ 下次回复: {interval // 60} 分钟后")
_update_next_display()
# 每5秒检查一次停止信号
for _ in range(5):
@ -1371,13 +1778,16 @@ def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enable
break
time.sleep(1)
_scheduler_next_times.clear()
_auto_log_append("🛑 自动化调度器已停止")
def start_scheduler(comment_on, publish_on, reply_on, like_on,
def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text):
"""启动定时自动化"""
@ -1385,10 +1795,10 @@ def start_scheduler(comment_on, publish_on, reply_on, like_on,
if _auto_running.is_set():
return "⚠️ 调度器已在运行中,请先停止"
if not comment_on and not publish_on and not reply_on and not like_on:
if not comment_on and not publish_on and not reply_on and not like_on and not favorite_on:
return "❌ 请至少启用一项自动化功能"
# 评论/回复需要 LLM点赞不需要
# 评论/回复需要 LLM点赞/收藏不需要
if (comment_on or reply_on):
api_key, _, _ = _get_llm_config()
if not api_key:
@ -1397,10 +1807,12 @@ def start_scheduler(comment_on, publish_on, reply_on, like_on,
_auto_running.set()
_auto_thread = threading.Thread(
target=_scheduler_loop,
args=(comment_on, publish_on, reply_on, like_on,
args=(comment_on, publish_on, reply_on, like_on, favorite_on,
c_min, c_max, p_min, p_max, r_min, r_max,
max_replies_per_run,
l_min, l_max, like_count_per_run,
fav_min, fav_max, fav_count_per_run,
op_start_hour, op_end_hour,
keywords, topics, mcp_url, sd_url_val, sd_model_name,
model, persona_text),
daemon=True,
@ -1409,16 +1821,18 @@ def start_scheduler(comment_on, publish_on, reply_on, like_on,
parts = []
if comment_on:
parts.append(f"评论 ({int(c_min)}-{int(c_max)} )")
parts.append(f"评论({int(c_min)}-{int(c_max)}分)")
if like_on:
parts.append(f"点赞 (每 {int(l_min)}-{int(l_max)} 分钟, {int(like_count_per_run)}个/轮)")
parts.append(f"点赞({int(l_min)}-{int(l_max)}分, {int(like_count_per_run)}个/轮)")
if favorite_on:
parts.append(f"收藏({int(fav_min)}-{int(fav_max)}分, {int(fav_count_per_run)}个/轮)")
if publish_on:
parts.append(f"发布 (每 {int(p_min)}-{int(p_max)} 分钟)")
parts.append(f"发布({int(p_min)}-{int(p_max)}分)")
if reply_on:
parts.append(f"回复 ({int(r_min)}-{int(r_max)} , 每轮{int(max_replies_per_run)})")
parts.append(f"回复({int(r_min)}-{int(r_max)}分, ≤{int(max_replies_per_run)}/轮)")
_auto_log_append(f"调度器已启动: {' + '.join(parts)}")
return f"✅ 自动化已启动 🟢\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度"
return f"✅ 自动化已启动 🟢\n⏰ 运营时段: {int(op_start_hour)}:00-{int(op_end_hour)}:00\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度"
def stop_scheduler():
@ -1438,9 +1852,22 @@ def get_auto_log():
def get_scheduler_status():
"""获取调度器运行状态"""
"""获取调度器运行状态 + 下次执行时间 + 今日统计"""
_reset_daily_stats_if_needed()
if _auto_running.is_set():
return "🟢 **调度器运行中**"
lines = ["🟢 **调度器运行中**"]
if _scheduler_next_times:
next_info = " | ".join(f"{k}@{v}" for k, v in _scheduler_next_times.items())
lines.append(f"⏰ 下次: {next_info}")
s = _daily_stats
lines.append(
f"📊 今日: 💬{s.get('comments',0)} ❤️{s.get('likes',0)} "
f"{s.get('favorites',0)} 🚀{s.get('publishes',0)} "
f"💌{s.get('replies',0)}{s.get('errors',0)}"
)
if _is_in_cooldown():
lines.append(f"⏳ 冷却中,{int(_error_cooldown_until - time.time())}s 后恢复")
return "\n".join(lines)
return "⚪ **调度器未运行**"
@ -1513,9 +1940,14 @@ with gr.Blocks(
sd_url = gr.Textbox(
label="SD WebUI URL", value=config["sd_url"], scale=2,
)
persona = gr.Textbox(
label="博主人设(评论回复用)",
value=config["persona"], scale=3,
with gr.Row():
persona = gr.Dropdown(
label="博主人设(评论/回复/自动运营通用)",
choices=[RANDOM_PERSONA_LABEL] + DEFAULT_PERSONAS,
value=config.get("persona", RANDOM_PERSONA_LABEL),
allow_custom_value=True,
interactive=True,
scale=5,
)
with gr.Row():
btn_connect_sd = gr.Button("🎨 连接 SD", size="sm")
@ -1537,7 +1969,7 @@ with gr.Blocks(
gr.Markdown("### 💡 构思")
topic = gr.Textbox(label="笔记主题", placeholder="例如:优衣库早春穿搭")
style = gr.Dropdown(
["好物种草", "干货教程", "情绪共鸣", "生活Vlog", "测评避雷", "知识科普"],
DEFAULT_STYLES,
label="风格", value="好物种草",
)
btn_gen_copy = gr.Button("✨ 第一步:生成文案", variant="primary")
@ -1861,7 +2293,7 @@ with gr.Blocks(
with gr.Tab("🤖 自动运营"):
gr.Markdown(
"### 🤖 无人值守自动化运营\n"
"> 一键评论引流 + 一键回复粉丝 + 一键内容发布 + 随机定时全自动\n\n"
"> 一键评论引流 + 一键点赞 + 一键收藏 + 一键回复 + 一键发布 + 随机定时全自动\n\n"
"⚠️ **注意**: 请确保已连接 LLM、SD WebUI 和 MCP 服务"
)
@ -1875,7 +2307,7 @@ with gr.Blocks(
)
auto_comment_keywords = gr.Textbox(
label="评论关键词池 (逗号分隔)",
value="穿搭, 美食, 护肤, 好物推荐, 旅行, 生活日常",
value=", ".join(DEFAULT_COMMENT_KEYWORDS),
placeholder="关键词1, 关键词2, ...",
)
btn_auto_comment = gr.Button(
@ -1884,7 +2316,7 @@ with gr.Blocks(
auto_comment_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### <EFBFBD> 一键自动点赞")
gr.Markdown("#### 👍 一键自动点赞")
gr.Markdown(
"> 搜索笔记 → 随机选择多篇 → 依次点赞\n"
"提升账号活跃度,无需 LLM"
@ -1898,7 +2330,21 @@ with gr.Blocks(
auto_like_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### <20>💌 一键自动回复")
gr.Markdown("#### ⭐ 一键自动收藏")
gr.Markdown(
"> 搜索笔记 → 随机选择多篇 → 依次收藏\n"
"提升账号活跃度,与点赞互补"
)
auto_fav_count = gr.Number(
label="单次收藏数量", value=3, minimum=1, maximum=15,
)
btn_auto_favorite = gr.Button(
"⭐ 一键收藏 (单次)", variant="primary", size="lg",
)
auto_favorite_result = gr.Markdown("")
gr.Markdown("---")
gr.Markdown("#### 💌 一键自动回复")
gr.Markdown(
"> 扫描我的所有笔记 → 找到粉丝评论 → AI 生成回复 → 逐条发送\n"
"自动跳过自己的评论,模拟真人间隔回复"
@ -1918,8 +2364,8 @@ with gr.Blocks(
)
auto_publish_topics = gr.Textbox(
label="主题池 (逗号分隔)",
value="春季穿搭, 通勤穿搭, 显瘦穿搭, 平价好物, 护肤心得, 好物分享",
placeholder="主题1, 主题2, ...",
value=", ".join(random.sample(DEFAULT_TOPICS, min(15, len(DEFAULT_TOPICS)))),
placeholder="主题会从池中随机选取,可自行修改",
)
btn_auto_publish = gr.Button(
"🚀 一键发布 (单次)", variant="primary", size="lg",
@ -1935,6 +2381,17 @@ with gr.Blocks(
)
sched_status = gr.Markdown("⚪ **调度器未运行**")
# 运营时段设置
with gr.Group():
gr.Markdown("##### ⏰ 运营时段")
with gr.Row():
sched_start_hour = gr.Number(
label="开始时间(整点)", value=8, minimum=0, maximum=23,
)
sched_end_hour = gr.Number(
label="结束时间(整点)", value=23, minimum=1, maximum=24,
)
with gr.Group():
sched_comment_on = gr.Checkbox(
label="✅ 启用自动评论", value=True,
@ -1962,6 +2419,21 @@ with gr.Blocks(
label="每轮点赞数量", value=5, minimum=1, maximum=15,
)
with gr.Group():
sched_fav_on = gr.Checkbox(
label="✅ 启用自动收藏", value=True,
)
with gr.Row():
sched_fav_min = gr.Number(
label="收藏最小间隔(分钟)", value=12, minimum=3,
)
sched_fav_max = gr.Number(
label="收藏最大间隔(分钟)", value=35, minimum=5,
)
sched_fav_count = gr.Number(
label="每轮收藏数量", value=3, minimum=1, maximum=10,
)
with gr.Group():
sched_reply_on = gr.Checkbox(
label="✅ 启用自动回复评论", value=True,
@ -1999,16 +2471,24 @@ with gr.Blocks(
sched_result = gr.Markdown("")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("#### 📋 运行日志")
with gr.Row():
btn_refresh_log = gr.Button("🔄 刷新日志", size="sm")
btn_clear_log = gr.Button("🗑️ 清空日志", size="sm")
btn_refresh_stats = gr.Button("📊 刷新统计", size="sm")
auto_log_display = gr.TextArea(
label="自动化运行日志",
value="📋 暂无日志\n\n💡 执行操作后日志将在此显示",
lines=15,
interactive=False,
)
with gr.Column(scale=1):
gr.Markdown("#### 📊 今日运营统计")
auto_stats_display = gr.Markdown(
value=_get_stats_summary(),
)
# ==================================================
# 事件绑定
@ -2209,6 +2689,11 @@ with gr.Blocks(
inputs=[auto_comment_keywords, auto_like_count, mcp_url],
outputs=[auto_like_result, auto_log_display],
)
btn_auto_favorite.click(
fn=_auto_favorite_with_log,
inputs=[auto_comment_keywords, auto_fav_count, mcp_url],
outputs=[auto_favorite_result, auto_log_display],
)
btn_auto_reply.click(
fn=_auto_reply_with_log,
inputs=[auto_reply_max, mcp_url, llm_model, persona],
@ -2222,9 +2707,12 @@ with gr.Blocks(
btn_start_sched.click(
fn=start_scheduler,
inputs=[sched_comment_on, sched_publish_on, sched_reply_on, sched_like_on,
sched_fav_on,
sched_c_min, sched_c_max, sched_p_min, sched_p_max,
sched_r_min, sched_r_max, sched_reply_max,
sched_l_min, sched_l_max, sched_like_count,
sched_fav_min, sched_fav_max, sched_fav_count,
sched_start_hour, sched_end_hour,
auto_comment_keywords, auto_publish_topics,
mcp_url, sd_url, sd_model, llm_model, persona],
outputs=[sched_result],
@ -2244,6 +2732,11 @@ with gr.Blocks(
inputs=[],
outputs=[auto_log_display],
)
btn_refresh_stats.click(
fn=lambda: (get_scheduler_status(), _get_stats_summary()),
inputs=[],
outputs=[sched_status, auto_stats_display],
)
# ---- 启动时自动刷新 SD ----
app.load(fn=connect_sd, inputs=[sd_url], outputs=[sd_model, status_bar])

View File

@ -12,14 +12,16 @@ logger = logging.getLogger(__name__)
SD_TIMEOUT = 900 # 图片生成可能需要较长时间
# 默认反向提示词(针对 JuggernautXL / SDXL 优化
# 默认反向提示词(针对 JuggernautXL / SDXL 优化,偏向东方审美
DEFAULT_NEGATIVE = (
"nsfw, nudity, lowres, bad anatomy, bad hands, text, error, missing fingers, "
"extra digit, fewer digits, cropped, worst quality, low quality, normal quality, "
"jpeg artifacts, signature, watermark, blurry, deformed, mutated, disfigured, "
"ugly, duplicate, morbid, mutilated, poorly drawn face, poorly drawn hands, "
"extra limbs, fused fingers, too many fingers, long neck, username, "
"out of frame, distorted, oversaturated, underexposed, overexposed"
"out of frame, distorted, oversaturated, underexposed, overexposed, "
"western face, european face, caucasian, deep-set eyes, high nose bridge, "
"blonde hair, red hair, blue eyes, green eyes, freckles, thick body hair"
)