""" services/scheduler.py 自动化运营调度器:状态管理、自动评论/点赞/收藏/发布/回复、定时循环、数据学习调度 """ import os import re import time import random import threading import logging from datetime import datetime from PIL import Image from .config_manager import ConfigManager, OUTPUT_DIR from .llm_service import LLMService from .sd_service import SDService from .mcp_client import get_mcp_client from .rate_limiter import ( _op_history, _daily_stats, DAILY_LIMITS, _reset_daily_stats_if_needed, _check_daily_limit, _increment_stat, _record_error, _clear_error_streak, _is_in_cooldown, _is_in_operating_hours, _get_stats_summary, ) from .persona import ( DEFAULT_TOPICS, DEFAULT_STYLES, DEFAULT_COMMENT_KEYWORDS, get_persona_keywords, _resolve_persona, ) from .connection import _get_llm_config from .analytics_service import AnalyticsService cfg = ConfigManager() logger = logging.getLogger("autobot") # AnalyticsService 注入(通过 configure_analytics() 设置) _analytics: "AnalyticsService | None" = None def configure_analytics(analytics_svc: "AnalyticsService"): """从 main.py 注入 AnalyticsService 实例""" global _analytics _analytics = analytics_svc # ================================================== # 自动化运营模块 # ================================================== # 自动化状态 _auto_running = threading.Event() _auto_thread: threading.Thread | None = None _auto_log: list[str] = [] from .rate_limiter import ( _op_history, _daily_stats, DAILY_LIMITS, _reset_daily_stats_if_needed, _check_daily_limit, _increment_stat, _record_error, _clear_error_streak, _is_in_cooldown, _is_in_operating_hours, _get_stats_summary, ) from .persona import ( DEFAULT_PERSONAS, RANDOM_PERSONA_LABEL, PERSONA_POOL_MAP, DEFAULT_TOPICS, DEFAULT_STYLES, DEFAULT_COMMENT_KEYWORDS, _match_persona_pools, get_persona_topics, get_persona_keywords, on_persona_changed, _resolve_persona, ) def _auto_log_append(msg: str): """记录自动化日志""" ts = datetime.now().strftime("%H:%M:%S") entry = f"[{ts}] {msg}" _auto_log.append(entry) if len(_auto_log) > 500: _auto_log[:] = _auto_log[-300:] logger.info("[自动化] %s", msg) def _auto_comment_with_log(keywords_str, mcp_url, model, persona_text): """一键评论 + 同步刷新日志""" msg = auto_comment_once(keywords_str, mcp_url, model, persona_text) return msg, get_auto_log() def auto_comment_once(keywords_str, mcp_url, model, persona_text): """一键评论:自动搜索高赞笔记 → AI生成评论 → 发送(含防重复/限额/冷却)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("comments"): return f"🚫 今日评论已达上限 ({DAILY_LIMITS['comments']})" persona_text = _resolve_persona(persona_text) # 如果用户未手动修改关键词池,则使用人设匹配的专属关键词池 persona_keywords = get_persona_keywords(persona_text) keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else persona_keywords keyword = random.choice(keywords) _auto_log_append(f"🔍 搜索关键词: {keyword}") client = get_mcp_client(mcp_url) # 随机切换搜索排序,丰富互动对象 sort_options = ["最多点赞", "综合", "最新"] sort_by = random.choice(sort_options) # 搜索高赞笔记 entries = client.search_feeds_parsed(keyword, sort_by=sort_by) if not entries: _auto_log_append("⚠️ 搜索无结果,尝试推荐列表") entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" # 过滤掉自己的笔记 & 已评论过的笔记 my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["commented_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已评论过,换个关键词试试" # 从前10个中随机选择 target = random.choice(entries[:min(10, len(entries))]) feed_id = target["feed_id"] xsec_token = target["xsec_token"] title = target.get("title", "未知") _auto_log_append(f"🎯 选中: {title[:30]} (@{target.get('author', '未知')}) [排序:{sort_by}]") if not feed_id or not xsec_token: return "❌ 笔记缺少必要参数 (feed_id/xsec_token)" # 模拟浏览延迟 time.sleep(random.uniform(3, 8)) # 加载笔记详情 result = client.get_feed_detail(feed_id, xsec_token, load_all_comments=True) if "error" in result: _record_error() return f"❌ 加载笔记失败: {result['error']}" full_text = result.get("text", "") if "评论" in full_text: parts = full_text.split("评论", 1) content_part = parts[0].strip()[:600] comments_part = ("评论" + parts[1])[:800] if len(parts) > 1 else "" else: content_part = full_text[:500] comments_part = "" # AI 生成评论 api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" svc = LLMService(api_key, base_url, model) comment = svc.generate_proactive_comment( persona_text, title, content_part, comments_part ) _auto_log_append(f"💬 生成评论: {comment[:60]}...") # 随机等待后发送 time.sleep(random.uniform(3, 10)) result = client.post_comment(feed_id, xsec_token, comment) resp_text = result.get("text", "") _auto_log_append(f"📡 MCP 响应: {resp_text[:200]}") if "error" in result: _record_error() _auto_log_append(f"❌ 评论发送失败: {result['error']}") return f"❌ 评论发送失败: {result['error']}" # 记录成功操作 _op_history["commented_feeds"].add(feed_id) _increment_stat("comments") _clear_error_streak() _auto_log_append(f"✅ 评论已发送到「{title[:20]}」 (今日第{_daily_stats['comments']}条)") return f"✅ 已评论「{title[:25]}」\n📝 评论: {comment}\n📊 今日评论: {_daily_stats['comments']}/{DAILY_LIMITS['comments']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键评论异常: {e}") return f"❌ 评论失败: {e}" def _auto_like_with_log(keywords_str, like_count, mcp_url): """一键点赞 + 同步刷新日志""" msg = auto_like_once(keywords_str, like_count, mcp_url) return msg, get_auto_log() def auto_like_once(keywords_str, like_count, mcp_url): """一键点赞:搜索/推荐笔记 → 随机选择 → 批量点赞(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("likes"): return f"🚫 今日点赞已达上限 ({DAILY_LIMITS['likes']})" keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS keyword = random.choice(keywords) like_count = int(like_count) if like_count else 5 # 不超过当日剩余额度 remaining = DAILY_LIMITS["likes"] - _daily_stats.get("likes", 0) like_count = min(like_count, remaining) _auto_log_append(f"👍 点赞关键词: {keyword} | 目标: {like_count} 个") client = get_mcp_client(mcp_url) # 搜索笔记 entries = client.search_feeds_parsed(keyword, sort_by="综合") if not entries: _auto_log_append("⚠️ 搜索无结果,尝试推荐列表") entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" # 过滤自己的笔记 & 已点赞过的 my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["liked_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已点赞过" # 随机打乱,取前 N 个 random.shuffle(entries) targets = entries[:min(like_count, len(entries))] liked = 0 for target in targets: feed_id = target.get("feed_id", "") xsec_token = target.get("xsec_token", "") title = target.get("title", "未知")[:25] if not feed_id or not xsec_token: continue # 模拟浏览延迟 time.sleep(random.uniform(2, 6)) result = client.like_feed(feed_id, xsec_token) if "error" in result: _auto_log_append(f" ❌ 点赞失败「{title}」: {result['error']}") else: liked += 1 _op_history["liked_feeds"].add(feed_id) _increment_stat("likes") _auto_log_append(f" ❤️ 已点赞「{title}」@{target.get('author', '未知')}") if liked > 0: _clear_error_streak() _auto_log_append(f"👍 点赞完成: 成功 {liked}/{len(targets)} (今日累计{_daily_stats.get('likes', 0)})") return f"✅ 点赞完成!成功 {liked}/{len(targets)} 个\n📊 今日点赞: {_daily_stats.get('likes', 0)}/{DAILY_LIMITS['likes']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键点赞异常: {e}") return f"❌ 点赞失败: {e}" def _auto_favorite_with_log(keywords_str, fav_count, mcp_url): """一键收藏 + 同步刷新日志""" msg = auto_favorite_once(keywords_str, fav_count, mcp_url) return msg, get_auto_log() def auto_favorite_once(keywords_str, fav_count, mcp_url): """一键收藏:搜索优质笔记 → 随机选择 → 批量收藏(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("favorites"): return f"🚫 今日收藏已达上限 ({DAILY_LIMITS['favorites']})" keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else DEFAULT_COMMENT_KEYWORDS keyword = random.choice(keywords) fav_count = int(fav_count) if fav_count else 3 remaining = DAILY_LIMITS["favorites"] - _daily_stats.get("favorites", 0) fav_count = min(fav_count, remaining) _auto_log_append(f"⭐ 收藏关键词: {keyword} | 目标: {fav_count} 个") client = get_mcp_client(mcp_url) entries = client.search_feeds_parsed(keyword, sort_by="最多收藏") if not entries: entries = client.list_feeds_parsed() if not entries: _record_error() return "❌ 未找到任何笔记" my_uid = cfg.get("my_user_id", "") entries = [ e for e in entries if e.get("user_id") != my_uid and e.get("feed_id") not in _op_history["favorited_feeds"] ] if not entries: return "ℹ️ 搜索结果中所有笔记都已收藏过" random.shuffle(entries) targets = entries[:min(fav_count, len(entries))] saved = 0 for target in targets: feed_id = target.get("feed_id", "") xsec_token = target.get("xsec_token", "") title = target.get("title", "未知")[:25] if not feed_id or not xsec_token: continue time.sleep(random.uniform(2, 6)) result = client.favorite_feed(feed_id, xsec_token) if "error" in result: _auto_log_append(f" ❌ 收藏失败「{title}」: {result['error']}") else: saved += 1 _op_history["favorited_feeds"].add(feed_id) _increment_stat("favorites") _auto_log_append(f" ⭐ 已收藏「{title}」@{target.get('author', '未知')}") if saved > 0: _clear_error_streak() _auto_log_append(f"⭐ 收藏完成: 成功 {saved}/{len(targets)} (今日累计{_daily_stats.get('favorites', 0)})") return f"✅ 收藏完成!成功 {saved}/{len(targets)} 个\n📊 今日收藏: {_daily_stats.get('favorites', 0)}/{DAILY_LIMITS['favorites']}" except Exception as e: _record_error() _auto_log_append(f"❌ 一键收藏异常: {e}") return f"❌ 收藏失败: {e}" def _auto_publish_with_log(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val, face_swap_on): """一键发布 + 同步刷新日志""" msg = auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text=persona_text, quality_mode_val=quality_mode_val, face_swap_on=face_swap_on) return msg, get_auto_log() def _auto_reply_with_log(max_replies, mcp_url, model, persona_text): """一键回复 + 同步刷新日志""" msg = auto_reply_once(max_replies, mcp_url, model, persona_text) return msg, get_auto_log() def auto_reply_once(max_replies, mcp_url, model, persona_text): """一键回复:获取我的笔记 → 加载评论 → AI 生成回复 → 发送(含防重复/限额)""" try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("replies"): return f"🚫 今日回复已达上限 ({DAILY_LIMITS['replies']})" persona_text = _resolve_persona(persona_text) my_uid = cfg.get("my_user_id", "") xsec = cfg.get("xsec_token", "") if not my_uid: return "❌ 未配置用户 ID,请到「账号登录」页填写" if not xsec: return "❌ 未获取 xsec_token,请先登录" api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置" max_replies = int(max_replies) if max_replies else 3 remaining = DAILY_LIMITS["replies"] - _daily_stats.get("replies", 0) max_replies = min(max_replies, remaining) client = get_mcp_client(mcp_url) _auto_log_append("💌 开始自动回复评论...") # Step 1: 获取我的笔记列表 result = client.get_user_profile(my_uid, xsec) if "error" in result: _auto_log_append(f"❌ 获取我的笔记失败: {result['error']}") return f"❌ 获取我的笔记失败: {result['error']}" # 解析笔记列表 raw = result.get("raw", {}) text = result.get("text", "") data = None if raw and isinstance(raw, dict): for item in raw.get("content", []): if item.get("type") == "text": try: data = json.loads(item["text"]) except (json.JSONDecodeError, KeyError): pass if not data: try: data = json.loads(text) except (json.JSONDecodeError, TypeError): pass feeds = (data or {}).get("feeds") or [] if not feeds: _auto_log_append("⚠️ 未找到任何笔记") return "⚠️ 未找到你的笔记" # 构建笔记条目 my_entries = [] for f in feeds: nc = f.get("noteCard") or {} my_entries.append({ "feed_id": f.get("id", ""), "xsec_token": f.get("xsecToken", ""), "title": nc.get("displayTitle", "未知标题"), }) _auto_log_append(f"📝 找到 {len(my_entries)} 篇笔记,开始扫描评论...") # Step 2: 遍历笔记,找到未回复的评论 total_replied = 0 svc = LLMService(api_key, base_url, model) for entry in my_entries: if total_replied >= max_replies: break feed_id = entry["feed_id"] xsec_token = entry["xsec_token"] title = entry["title"] if not feed_id or not xsec_token: continue time.sleep(random.uniform(1, 3)) # 加载笔记评论(使用结构化接口) comments = client.get_feed_comments(feed_id, xsec_token, load_all=True) if not comments: continue # 过滤掉自己的评论 & 已回复过的评论 other_comments = [ c for c in comments if c.get("user_id") and c["user_id"] != my_uid and c.get("content") and c.get("comment_id", "") not in _op_history["replied_comments"] ] if not other_comments: continue _auto_log_append(f"📖「{title[:20]}」有 {len(other_comments)} 条他人评论") for comment in other_comments: if total_replied >= max_replies: break comment_id = comment.get("comment_id", "") comment_uid = comment.get("user_id", "") comment_text = comment.get("content", "") nickname = comment.get("nickname", "网友") if not comment_text.strip(): continue _auto_log_append(f" 💬 @{nickname}: {comment_text[:40]}...") # AI 生成回复 try: reply = svc.generate_reply(persona_text, title, comment_text) except Exception as e: _auto_log_append(f" ❌ AI 回复生成失败: {e}") continue _auto_log_append(f" 🤖 回复: {reply[:50]}...") # 发送回复 time.sleep(random.uniform(2, 6)) if comment_id and comment_uid: # 使用 reply_comment 精确回复 resp = client.reply_comment( feed_id, xsec_token, comment_id, comment_uid, reply ) else: # 没有 comment_id 就用 post_comment 发到笔记下 resp = client.post_comment(feed_id, xsec_token, f"@{nickname} {reply}") resp_text = resp.get("text", "") if "error" in resp: _auto_log_append(f" ❌ 回复发送失败: {resp['error']}") else: _auto_log_append(f" ✅ 已回复 @{nickname}") total_replied += 1 if comment_id: _op_history["replied_comments"].add(comment_id) _increment_stat("replies") if total_replied > 0: _clear_error_streak() if total_replied == 0: _auto_log_append("ℹ️ 没有找到需要回复的新评论") return "ℹ️ 没有找到需要回复的新评论\n\n💡 可能所有评论都已回复过" else: _auto_log_append(f"✅ 自动回复完成,共回复 {total_replied} 条 (今日累计{_daily_stats.get('replies', 0)})") return f"✅ 自动回复完成!共回复 {total_replied} 条评论\n📊 今日回复: {_daily_stats.get('replies', 0)}/{DAILY_LIMITS['replies']}" except Exception as e: _record_error() _auto_log_append(f"❌ 自动回复异常: {e}") return f"❌ 自动回复失败: {e}" def auto_publish_once(topics_str, mcp_url, sd_url_val, sd_model_name, model, persona_text=None, quality_mode_val=None, face_swap_on=False): """一键发布:生成内容 → 加入发布队列(自动排期 + 自动审核)。 实际发布由 QueuePublisher 后台处理器完成。 """ try: if _is_in_cooldown(): return "⏳ 错误冷却中,请稍后再试" if not _check_daily_limit("publishes"): return f"🚫 今日发布已达上限 ({DAILY_LIMITS['publishes']})" # 延迟导入避免循环依赖 from .queue_ops import generate_to_queue topics = topics_str if topics_str else ",".join(DEFAULT_TOPICS) msg = generate_to_queue( topics, sd_url_val, sd_model_name, model, persona_text=persona_text, quality_mode_val=quality_mode_val, face_swap_on=face_swap_on, count=1, auto_schedule=True, auto_approve=True, ) _auto_log_append(f"📋 内容已入队: {msg}") # 检查 QueuePublisher 是否在运行 try: from .queue_ops import _queue_publisher if _queue_publisher and not _queue_publisher.is_running: _auto_log_append("⚠️ 队列处理器未启动,内容已入队但需启动处理器以自动发布") return msg + "\n⚠️ 请启动队列处理器以自动发布" except ImportError: pass return msg except Exception as e: _record_error() _auto_log_append(f"❌ 一键发布异常: {e}") return f"❌ 发布失败: {e}" _scheduler_next_times = {} def _scheduler_loop(comment_enabled, publish_enabled, reply_enabled, like_enabled, favorite_enabled, comment_min, comment_max, publish_min, publish_max, reply_min, reply_max, max_replies_per_run, like_min, like_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val=None, face_swap_on=False): """后台定时调度循环(含运营时段、冷却、收藏、统计)""" _auto_log_append("🤖 自动化调度器已启动") _auto_log_append(f"⏰ 运营时段: {int(op_start_hour)}:00 - {int(op_end_hour)}:00") # 首次执行的随机延迟 next_comment = time.time() + random.randint(10, 60) next_publish = time.time() + random.randint(30, 120) next_reply = time.time() + random.randint(15, 90) next_like = time.time() + random.randint(5, 40) next_favorite = time.time() + random.randint(10, 50) def _update_next_display(): """更新下次执行时间显示""" times = {} if comment_enabled: times["评论"] = datetime.fromtimestamp(next_comment).strftime("%H:%M:%S") if like_enabled: times["点赞"] = datetime.fromtimestamp(next_like).strftime("%H:%M:%S") if favorite_enabled: times["收藏"] = datetime.fromtimestamp(next_favorite).strftime("%H:%M:%S") if reply_enabled: times["回复"] = datetime.fromtimestamp(next_reply).strftime("%H:%M:%S") if publish_enabled: times["发布"] = datetime.fromtimestamp(next_publish).strftime("%H:%M:%S") _scheduler_next_times.update(times) _update_next_display() while _auto_running.is_set(): now = time.time() # 检查运营时段 if not _is_in_operating_hours(int(op_start_hour), int(op_end_hour)): now_hour = datetime.now().hour _auto_log_append(f"😴 当前{now_hour}时,不在运营时段({int(op_start_hour)}-{int(op_end_hour)}),休眠中...") # 休眠到运营时间开始 for _ in range(300): # 5分钟检查一次 if not _auto_running.is_set(): break time.sleep(1) continue # 检查错误冷却 if _is_in_cooldown(): remain = int(_error_cooldown_until - time.time()) if remain > 0 and remain % 30 == 0: _auto_log_append(f"⏳ 错误冷却中,剩余 {remain}s") time.sleep(5) continue # 自动评论 if comment_enabled and now >= next_comment: try: _auto_log_append("--- 🔄 执行自动评论 ---") msg = auto_comment_once(keywords, mcp_url, model, persona_text) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动评论异常: {e}") interval = random.randint(int(comment_min) * 60, int(comment_max) * 60) next_comment = time.time() + interval _auto_log_append(f"⏰ 下次评论: {interval // 60} 分钟后") _update_next_display() # 自动点赞 if like_enabled and now >= next_like: try: _auto_log_append("--- 🔄 执行自动点赞 ---") msg = auto_like_once(keywords, like_count_per_run, mcp_url) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动点赞异常: {e}") interval = random.randint(int(like_min) * 60, int(like_max) * 60) next_like = time.time() + interval _auto_log_append(f"⏰ 下次点赞: {interval // 60} 分钟后") _update_next_display() # 自动收藏 if favorite_enabled and now >= next_favorite: try: _auto_log_append("--- 🔄 执行自动收藏 ---") msg = auto_favorite_once(keywords, fav_count_per_run, mcp_url) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动收藏异常: {e}") interval = random.randint(int(fav_min) * 60, int(fav_max) * 60) next_favorite = time.time() + interval _auto_log_append(f"⏰ 下次收藏: {interval // 60} 分钟后") _update_next_display() # 自动发布(通过队列) if publish_enabled and now >= next_publish: try: _auto_log_append("--- 🔄 执行自动发布(队列模式) ---") from .queue_ops import generate_to_queue msg = generate_to_queue( topics, sd_url_val, sd_model_name, model, persona_text=persona_text, quality_mode_val=quality_mode_val, face_swap_on=face_swap_on, count=1, auto_schedule=True, auto_approve=True, ) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动发布异常: {e}") interval = random.randint(int(publish_min) * 60, int(publish_max) * 60) next_publish = time.time() + interval _auto_log_append(f"⏰ 下次发布: {interval // 60} 分钟后") _update_next_display() # 自动回复评论 if reply_enabled and now >= next_reply: try: _auto_log_append("--- 🔄 执行自动回复评论 ---") msg = auto_reply_once(max_replies_per_run, mcp_url, model, persona_text) _auto_log_append(msg) except Exception as e: _auto_log_append(f"❌ 自动回复异常: {e}") interval = random.randint(int(reply_min) * 60, int(reply_max) * 60) next_reply = time.time() + interval _auto_log_append(f"⏰ 下次回复: {interval // 60} 分钟后") _update_next_display() # 每5秒检查一次停止信号 for _ in range(5): if not _auto_running.is_set(): break time.sleep(1) _scheduler_next_times.clear() _auto_log_append("🛑 自动化调度器已停止") def start_scheduler(comment_on, publish_on, reply_on, like_on, favorite_on, c_min, c_max, p_min, p_max, r_min, r_max, max_replies_per_run, l_min, l_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text, quality_mode_val, face_swap_on): """启动定时自动化""" global _auto_thread if _auto_running.is_set(): return "⚠️ 调度器已在运行中,请先停止" if not comment_on and not publish_on and not reply_on and not like_on and not favorite_on: return "❌ 请至少启用一项自动化功能" # 评论/回复需要 LLM,点赞/收藏不需要 if (comment_on or reply_on): api_key, _, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" _auto_running.set() _auto_thread = threading.Thread( target=_scheduler_loop, args=(comment_on, publish_on, reply_on, like_on, favorite_on, c_min, c_max, p_min, p_max, r_min, r_max, max_replies_per_run, l_min, l_max, like_count_per_run, fav_min, fav_max, fav_count_per_run, op_start_hour, op_end_hour, keywords, topics, mcp_url, sd_url_val, sd_model_name, model, persona_text), kwargs={"quality_mode_val": quality_mode_val, "face_swap_on": face_swap_on}, daemon=True, ) _auto_thread.start() parts = [] if comment_on: parts.append(f"评论({int(c_min)}-{int(c_max)}分)") if like_on: parts.append(f"点赞({int(l_min)}-{int(l_max)}分, {int(like_count_per_run)}个/轮)") if favorite_on: parts.append(f"收藏({int(fav_min)}-{int(fav_max)}分, {int(fav_count_per_run)}个/轮)") if publish_on: parts.append(f"发布({int(p_min)}-{int(p_max)}分)") if reply_on: parts.append(f"回复({int(r_min)}-{int(r_max)}分, ≤{int(max_replies_per_run)}条/轮)") _auto_log_append(f"调度器已启动: {' + '.join(parts)}") return f"✅ 自动化已启动 🟢\n⏰ 运营时段: {int(op_start_hour)}:00-{int(op_end_hour)}:00\n任务: {' | '.join(parts)}\n\n💡 点击「刷新日志」查看实时进度" def stop_scheduler(): """停止定时自动化""" if not _auto_running.is_set(): return "⚠️ 调度器未在运行" _auto_running.clear() _auto_log_append("⏹️ 收到停止信号,等待当前任务完成...") return "🛑 调度器停止中...当前任务完成后将完全停止" def get_auto_log(): """获取自动化运行日志""" if not _auto_log: return "📋 暂无日志\n\n💡 点击「一键评论」「一键发布」或启动定时后日志将在此显示" return "\n".join(_auto_log[-80:]) def get_scheduler_status(): """获取调度器运行状态 + 下次执行时间 + 今日统计""" _reset_daily_stats_if_needed() if _auto_running.is_set(): lines = ["🟢 **调度器运行中**"] if _scheduler_next_times: next_info = " | ".join(f"{k}@{v}" for k, v in _scheduler_next_times.items()) lines.append(f"⏰ 下次: {next_info}") s = _daily_stats lines.append( f"📊 今日: 💬{s.get('comments',0)} ❤️{s.get('likes',0)} " f"⭐{s.get('favorites',0)} 🚀{s.get('publishes',0)} " f"💌{s.get('replies',0)} ❌{s.get('errors',0)}" ) if _is_in_cooldown(): lines.append(f"⏳ 冷却中,{int(_error_cooldown_until - time.time())}s 后恢复") return "\n".join(lines) return "⚪ **调度器未运行**" # ================================================== # 智能学习 & 笔记分析模块 # ================================================== # 定时学习状态 _learn_running = threading.Event() _learn_thread: threading.Thread | None = None def analytics_collect_data(mcp_url, user_id, xsec_token): """采集笔记表现数据""" if not user_id or not xsec_token: return "❌ 请先填写用户 ID 和 xsec_token (在「账号登录」Tab 获取)" try: client = get_mcp_client(mcp_url) result = _analytics.collect_note_performance(client, user_id, xsec_token) if "error" in result: return f"❌ 数据采集失败: {result['error']}" return ( f"✅ 数据采集完成!\n" f"📝 总笔记数: {result['total']}\n" f"🔄 更新: {result['updated']} 篇\n\n" f"💡 点击「计算权重」进行智能学习" ) except Exception as e: logger.error("数据采集失败: %s", e) return f"❌ 采集失败: {e}" def analytics_calculate_weights(): """计算内容权重""" try: result = _analytics.calculate_weights() if "error" in result: return "❌ " + result["error"], _analytics.generate_report() top = result.get("top_note") top_str = f" | 🏆 最佳: {top['title']} (❤️ {top.get('likes', 0)})" if top else "" msg = ( f"✅ 权重计算完成!\n" f"📊 分析了 {result['total_notes']} 篇笔记{top_str}\n\n" f"💡 权重已自动保存,启用「智能加权发布」后自动生效" ) return msg, _analytics.generate_report() except Exception as e: logger.error("权重计算失败: %s", e) return f"❌ 计算失败: {e}", "" def analytics_llm_deep_analysis(model): """LLM 深度分析笔记表现""" note_data = _analytics.generate_llm_analysis_prompt() if not note_data: return "❌ 暂无笔记数据,请先采集" try: api_key, base_url, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置" svc = LLMService(api_key, base_url, model) result = svc.analyze_note_performance(note_data) lines = ["## 🧠 AI 深度分析报告\n"] if result.get("high_perform_features"): lines.append(f"### ✅ 高表现内容特征\n{result['high_perform_features']}\n") if result.get("low_perform_issues"): lines.append(f"### ⚠️ 低表现内容反思\n{result['low_perform_issues']}\n") if result.get("user_preference"): lines.append(f"### 👤 用户偏好画像\n{result['user_preference']}\n") suggestions = result.get("content_suggestions", []) if suggestions: lines.append("### 📌 内容方向建议") for s in suggestions: priority = "🔴" if s.get("priority", 3) <= 2 else "🟡" if s.get("priority", 3) <= 3 else "🟢" lines.append(f"- {priority} **{s.get('topic', '')}**: {s.get('reason', '')}") lines.append("") templates = result.get("title_templates", []) if templates: lines.append("### ✏️ 标题模板") for t in templates: lines.append(f"- 📝 {t}") lines.append("") tags = result.get("recommended_tags", []) if tags: lines.append(f"### 🏷️ 推荐标签\n{' '.join(f'`#{t}`' for t in tags)}\n") return "\n".join(lines) except Exception as e: logger.error("LLM 分析失败: %s", e) return f"❌ AI 分析失败: {e}" def analytics_get_report(): """获取分析报告""" return _analytics.generate_report() def analytics_get_weighted_topics(): """获取加权主题列表""" weighted = _analytics.get_weighted_topics_display() if weighted: return weighted return "暂无权重数据,请先执行「采集数据 → 计算权重」" def _learn_scheduler_loop(mcp_url, user_id, xsec_token, model, interval_hours): """定时学习后台循环""" logger.info("定时学习已启动, 间隔 %s 小时", interval_hours) _auto_log_append(f"🧠 定时学习已启动, 每 {interval_hours} 小时自动分析一次") while _learn_running.is_set(): try: # 采集数据 client = get_mcp_client(mcp_url) result = _analytics.collect_note_performance(client, user_id, xsec_token) if "error" not in result: _auto_log_append(f"🧠 自动采集完成: {result['total']} 篇笔记, 更新 {result['updated']} 篇") # 计算权重 weight_result = _analytics.calculate_weights() if "error" not in weight_result: _auto_log_append(f"🧠 权重更新完成: 分析 {weight_result['total_notes']} 篇") # LLM 深度分析 (如果有配置) api_key, base_url, _ = _get_llm_config() if api_key and model: try: note_data = _analytics.generate_llm_analysis_prompt() if note_data: svc = LLMService(api_key, base_url, model) svc.analyze_note_performance(note_data) _auto_log_append("🧠 AI 深度分析完成") except Exception as e: _auto_log_append(f"⚠️ AI 分析失败 (非致命): {e}") else: _auto_log_append(f"⚠️ 自动采集失败: {result.get('error', '未知')}") except Exception as e: _auto_log_append(f"⚠️ 定时学习异常: {e}") # 等待下一次执行 wait_seconds = interval_hours * 3600 for _ in range(int(wait_seconds / 5)): if not _learn_running.is_set(): break time.sleep(5) logger.info("定时学习已停止") _auto_log_append("🧠 定时学习已停止") def start_learn_scheduler(mcp_url, user_id, xsec_token, model, interval_hours): """启动定时学习""" global _learn_thread if _learn_running.is_set(): return "⚠️ 定时学习已在运行中" if not user_id or not xsec_token: return "❌ 请先在「账号登录」获取用户 ID 和 Token" _learn_running.set() _learn_thread = threading.Thread( target=_learn_scheduler_loop, args=(mcp_url, user_id, xsec_token, model, interval_hours), daemon=True, ) _learn_thread.start() return f"✅ 定时学习已启动 🧠 每 {int(interval_hours)} 小时自动分析" def stop_learn_scheduler(): """停止定时学习""" if not _learn_running.is_set(): return "⚠️ 定时学习未在运行" _learn_running.clear() return "🛑 定时学习已停止" # ================================================== # 热点自动采集 # ================================================== _hotspot_collector_running = threading.Event() _hotspot_collector_thread: threading.Thread | None = None def _hotspot_collector_loop(keywords: list[str], interval_hours: float, mcp_url: str, model: str): """热点自动采集后台循环:遍历 keywords → 搜索 → LLM 分析 → 写入状态缓存""" from .hotspot import search_hotspots, analyze_and_suggest logger.info("热点自动采集已启动, 关键词=%s, 间隔=%s小时", keywords, interval_hours) _auto_log_append(f"🔥 热点自动采集已启动, 每 {interval_hours} 小时采集一次, 关键词: {', '.join(keywords)}") while _hotspot_collector_running.is_set(): for kw in keywords: if not _hotspot_collector_running.is_set(): break try: _auto_log_append(f"🔥 自动采集热点: 搜索「{kw}」...") status, search_result = search_hotspots(kw, "最多点赞", mcp_url) if "❌" in status or not search_result: _auto_log_append(f"⚠️ 热点搜索失败: {status}") continue _auto_log_append(f"🔥 自动采集热点: AI 分析「{kw}」...") a_status, _, _, _ = analyze_and_suggest(model, kw, search_result) _auto_log_append(f"🔥 热点采集「{kw}」: {a_status}") except Exception as e: _auto_log_append(f"⚠️ 热点采集「{kw}」异常: {e}") # 关键词间间隔,避免过快请求 for _ in range(30): if not _hotspot_collector_running.is_set(): break time.sleep(1) # 等待下一轮 wait_seconds = int(interval_hours * 3600) _auto_log_append(f"🔥 热点采集完成一轮, {interval_hours}小时后再次采集") for _ in range(int(wait_seconds / 5)): if not _hotspot_collector_running.is_set(): break time.sleep(5) logger.info("热点自动采集已停止") _auto_log_append("🔥 热点自动采集已停止") def start_hotspot_collector(keywords_str: str, interval_hours: float, mcp_url: str, model: str): """启动热点自动采集""" global _hotspot_collector_thread if _hotspot_collector_running.is_set(): return "⚠️ 热点自动采集已在运行中" keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] if not keywords: return "❌ 请输入至少一个采集关键词" api_key, _, _ = _get_llm_config() if not api_key: return "❌ LLM 未配置,请先在全局设置中配置提供商" _hotspot_collector_running.set() _hotspot_collector_thread = threading.Thread( target=_hotspot_collector_loop, args=(keywords, interval_hours, mcp_url, model), daemon=True, ) _hotspot_collector_thread.start() return f"✅ 热点自动采集已启动 🔥 每 {int(interval_hours)} 小时采集一次, 关键词: {', '.join(keywords)}" def stop_hotspot_collector(): """停止热点自动采集""" if not _hotspot_collector_running.is_set(): return "⚠️ 热点自动采集未在运行" _hotspot_collector_running.clear() return "🛑 热点自动采集已停止" # ================================================== # Windows 开机自启管理 # ================================================== from .autostart import ( is_autostart_enabled, enable_autostart, disable_autostart, toggle_autostart, ) # ================================================== # UI 构建 # ================================================== config = cfg.all _GRADIO_CSS = """ .status-ok { color: #16a34a; font-weight: bold; } .status-err { color: #dc2626; font-weight: bold; } footer { display: none !important; } """