""" 消息聚合服务 实现同一群组中同一用户短时间内多条消息的聚合功能 """ import time import threading import json from typing import Dict, List, Optional, Any, Tuple from loguru import logger from dataclasses import dataclass, field from datetime import datetime, timedelta from config import settings @dataclass class PendingMessage: """待聚合的消息""" message_data: Dict[str, Any] timestamp: float content: str from_user: str from_group: str @dataclass class MessageAggregation: """消息聚合对象""" from_user: str from_group: str messages: List[PendingMessage] = field(default_factory=list) first_message_time: float = 0.0 last_message_time: float = 0.0 timer: Optional[threading.Timer] = None lock: threading.Lock = field(default_factory=threading.Lock) def add_message(self, message: PendingMessage): """添加消息到聚合中""" with self.lock: self.messages.append(message) if not self.first_message_time: self.first_message_time = message.timestamp self.last_message_time = message.timestamp def get_aggregated_content(self) -> str: """获取聚合后的消息内容""" with self.lock: if not self.messages: return "" # 按时间戳排序消息 sorted_messages = sorted(self.messages, key=lambda x: x.timestamp) # 合并消息内容 contents = [msg.content for msg in sorted_messages] return "\n".join(contents) def get_latest_message_data(self) -> Optional[Dict[str, Any]]: """获取最新的消息数据作为模板""" with self.lock: if not self.messages: return None # 返回最后一条消息的数据作为模板 latest_message = max(self.messages, key=lambda x: x.timestamp) return latest_message.message_data.copy() def clear(self): """清空聚合数据""" with self.lock: self.messages.clear() self.first_message_time = 0.0 self.last_message_time = 0.0 if self.timer: self.timer.cancel() self.timer = None class MessageAggregatorService: """消息聚合服务""" def __init__(self): self.aggregations: Dict[str, MessageAggregation] = {} self.lock = threading.Lock() self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15) # 默认15秒 self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True) # 默认启用 logger.info(f"消息聚合服务初始化: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}") def _get_aggregation_key(self, from_user: str, from_group: str) -> str: """获取聚合键""" return f"{from_group}:{from_user}" def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool: """ 判断消息是否应该被聚合 Args: message_data: 消息数据 Returns: 是否应该聚合 """ if not self.aggregation_enabled: return False # 检查消息类型是否是群聊消息 message_type = message_data.get("messageType") if message_type != "80001": return False data = message_data.get("data", {}) # 检查是否是自己发送的消息 if data.get("self", False): return False # 检查必要字段 if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"): return False return True def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]: """ 将消息添加到聚合中 Args: message_data: 消息数据 Returns: (是否需要立即处理, 聚合后的消息数据) - 如果返回 (False, None),表示消息已被聚合,不需要立即处理 - 如果返回 (True, aggregated_data),表示聚合超时,需要处理聚合后的消息 """ if not self.should_aggregate_message(message_data): # 不需要聚合,直接处理 return True, message_data data = message_data.get("data", {}) from_user = data.get("fromUser") from_group = data.get("fromGroup") content = data.get("content", "") aggregation_key = self._get_aggregation_key(from_user, from_group) current_time = time.time() pending_message = PendingMessage( message_data=message_data, timestamp=current_time, content=content, from_user=from_user, from_group=from_group ) with self.lock: if aggregation_key not in self.aggregations: # 创建新的聚合 self.aggregations[aggregation_key] = MessageAggregation( from_user=from_user, from_group=from_group ) aggregation = self.aggregations[aggregation_key] # 取消之前的定时器 with aggregation.lock: if aggregation.timer: aggregation.timer.cancel() # 添加消息到聚合 aggregation.add_message(pending_message) # 设置新的定时器 timer = threading.Timer( self.aggregation_timeout, self._process_aggregated_messages, args=(aggregation_key,) ) with aggregation.lock: aggregation.timer = timer timer.start() logger.info(f"消息已添加到聚合: user={from_user}, group={from_group}, " f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s") # 消息已被聚合,不需要立即处理 return False, None def _process_aggregated_messages(self, aggregation_key: str): """ 处理聚合的消息(定时器回调) Args: aggregation_key: 聚合键 """ try: with self.lock: if aggregation_key not in self.aggregations: logger.warning(f"聚合键不存在: {aggregation_key}") return aggregation = self.aggregations[aggregation_key] # 获取聚合后的消息数据 aggregated_content = aggregation.get_aggregated_content() latest_message_data = aggregation.get_latest_message_data() if not aggregated_content or not latest_message_data: logger.warning(f"聚合数据为空: {aggregation_key}") self._cleanup_aggregation(aggregation_key) return # 创建聚合后的消息数据 aggregated_message_data = latest_message_data.copy() aggregated_message_data["data"]["content"] = aggregated_content message_count = len(aggregation.messages) logger.info(f"处理聚合消息: user={aggregation.from_user}, group={aggregation.from_group}, " f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}") # 清理聚合数据 self._cleanup_aggregation(aggregation_key) # 调用消息处理器处理聚合后的消息(延迟导入避免循环依赖) from app.services.message_processor import message_processor success = message_processor.process_single_message(aggregated_message_data) if success: logger.info(f"聚合消息处理成功: user={aggregation.from_user}, group={aggregation.from_group}") else: logger.error(f"聚合消息处理失败: user={aggregation.from_user}, group={aggregation.from_group}") except Exception as e: logger.error(f"处理聚合消息异常: aggregation_key={aggregation_key}, error={str(e)}") self._cleanup_aggregation(aggregation_key) def _cleanup_aggregation(self, aggregation_key: str): """ 清理聚合数据 Args: aggregation_key: 聚合键 """ try: with self.lock: if aggregation_key in self.aggregations: aggregation = self.aggregations[aggregation_key] aggregation.clear() del self.aggregations[aggregation_key] logger.debug(f"聚合数据已清理: {aggregation_key}") except Exception as e: logger.error(f"清理聚合数据异常: aggregation_key={aggregation_key}, error={str(e)}") def get_aggregation_status(self) -> Dict[str, Any]: """ 获取聚合状态信息 Returns: 聚合状态信息 """ with self.lock: status = { "enabled": self.aggregation_enabled, "timeout": self.aggregation_timeout, "active_aggregations": len(self.aggregations), "aggregations": {} } for key, aggregation in self.aggregations.items(): with aggregation.lock: status["aggregations"][key] = { "from_user": aggregation.from_user, "from_group": aggregation.from_group, "message_count": len(aggregation.messages), "first_message_time": aggregation.first_message_time, "last_message_time": aggregation.last_message_time, "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0 } return status def force_process_aggregation(self, from_user: str, from_group: str) -> bool: """ 强制处理指定用户和群组的聚合消息 Args: from_user: 用户ID from_group: 群组ID Returns: 是否成功处理 """ aggregation_key = self._get_aggregation_key(from_user, from_group) with self.lock: if aggregation_key not in self.aggregations: logger.warning(f"没有找到待聚合的消息: user={from_user}, group={from_group}") return False logger.info(f"强制处理聚合消息: user={from_user}, group={from_group}") self._process_aggregated_messages(aggregation_key) return True def stop(self): """停止聚合服务""" logger.info("正在停止消息聚合服务...") with self.lock: for aggregation_key, aggregation in self.aggregations.items(): try: with aggregation.lock: if aggregation.timer: aggregation.timer.cancel() logger.debug(f"已取消聚合定时器: {aggregation_key}") except Exception as e: logger.error(f"取消聚合定时器异常: {aggregation_key}, error={str(e)}") self.aggregations.clear() logger.info("消息聚合服务已停止") # 全局消息聚合服务实例 message_aggregator = MessageAggregatorService()