"""
|
消息聚合服务
|
实现同一群组中同一用户短时间内多条消息的聚合功能
|
"""
|
|
import time
|
import threading
|
import json
|
from typing import Dict, List, Optional, Any, Tuple
|
from loguru import logger
|
from dataclasses import dataclass, field
|
from datetime import datetime, timedelta
|
|
from config import settings
|
|
|
@dataclass
|
class PendingMessage:
|
"""待聚合的消息"""
|
message_data: Dict[str, Any]
|
timestamp: float
|
content: str
|
from_user: str
|
from_group: str
|
|
|
@dataclass
|
class MessageAggregation:
|
"""消息聚合对象"""
|
from_user: str
|
from_group: str
|
messages: List[PendingMessage] = field(default_factory=list)
|
first_message_time: float = 0.0
|
last_message_time: float = 0.0
|
timer: Optional[threading.Timer] = None
|
lock: threading.Lock = field(default_factory=threading.Lock)
|
|
def add_message(self, message: PendingMessage):
|
"""添加消息到聚合中"""
|
with self.lock:
|
self.messages.append(message)
|
if not self.first_message_time:
|
self.first_message_time = message.timestamp
|
self.last_message_time = message.timestamp
|
|
def get_aggregated_content(self) -> str:
|
"""获取聚合后的消息内容"""
|
with self.lock:
|
if not self.messages:
|
return ""
|
|
# 按时间戳排序消息
|
sorted_messages = sorted(self.messages, key=lambda x: x.timestamp)
|
|
# 合并消息内容
|
contents = [msg.content for msg in sorted_messages]
|
return "\n".join(contents)
|
|
def get_latest_message_data(self) -> Optional[Dict[str, Any]]:
|
"""获取最新的消息数据作为模板"""
|
with self.lock:
|
if not self.messages:
|
return None
|
|
# 返回最后一条消息的数据作为模板
|
latest_message = max(self.messages, key=lambda x: x.timestamp)
|
return latest_message.message_data.copy()
|
|
def clear(self):
|
"""清空聚合数据"""
|
with self.lock:
|
self.messages.clear()
|
self.first_message_time = 0.0
|
self.last_message_time = 0.0
|
if self.timer:
|
self.timer.cancel()
|
self.timer = None
|
|
|
class MessageAggregatorService:
|
"""消息聚合服务"""
|
|
def __init__(self):
|
self.aggregations: Dict[str, MessageAggregation] = {}
|
self.lock = threading.Lock()
|
self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15) # 默认15秒
|
self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True) # 默认启用
|
|
logger.info(f"消息聚合服务初始化: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}")
|
|
def _get_aggregation_key(self, from_user: str, from_group: str) -> str:
|
"""获取聚合键"""
|
return f"{from_group}:{from_user}"
|
|
def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool:
|
"""
|
判断消息是否应该被聚合
|
|
Args:
|
message_data: 消息数据
|
|
Returns:
|
是否应该聚合
|
"""
|
if not self.aggregation_enabled:
|
return False
|
|
# 检查消息类型是否是群聊消息
|
message_type = message_data.get("messageType")
|
if message_type != "80001":
|
return False
|
|
data = message_data.get("data", {})
|
|
# 检查是否是自己发送的消息
|
if data.get("self", False):
|
return False
|
|
# 检查必要字段
|
if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"):
|
return False
|
|
return True
|
|
def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
"""
|
将消息添加到聚合中
|
|
Args:
|
message_data: 消息数据
|
|
Returns:
|
(是否需要立即处理, 聚合后的消息数据)
|
- 如果返回 (False, None),表示消息已被聚合,不需要立即处理
|
- 如果返回 (True, aggregated_data),表示聚合超时,需要处理聚合后的消息
|
"""
|
if not self.should_aggregate_message(message_data):
|
# 不需要聚合,直接处理
|
return True, message_data
|
|
data = message_data.get("data", {})
|
from_user = data.get("fromUser")
|
from_group = data.get("fromGroup")
|
content = data.get("content", "")
|
|
aggregation_key = self._get_aggregation_key(from_user, from_group)
|
current_time = time.time()
|
|
pending_message = PendingMessage(
|
message_data=message_data,
|
timestamp=current_time,
|
content=content,
|
from_user=from_user,
|
from_group=from_group
|
)
|
|
with self.lock:
|
if aggregation_key not in self.aggregations:
|
# 创建新的聚合
|
self.aggregations[aggregation_key] = MessageAggregation(
|
from_user=from_user,
|
from_group=from_group
|
)
|
|
aggregation = self.aggregations[aggregation_key]
|
|
# 取消之前的定时器
|
with aggregation.lock:
|
if aggregation.timer:
|
aggregation.timer.cancel()
|
|
# 添加消息到聚合
|
aggregation.add_message(pending_message)
|
|
# 设置新的定时器
|
timer = threading.Timer(
|
self.aggregation_timeout,
|
self._process_aggregated_messages,
|
args=(aggregation_key,)
|
)
|
|
with aggregation.lock:
|
aggregation.timer = timer
|
|
timer.start()
|
|
logger.info(f"消息已添加到聚合: user={from_user}, group={from_group}, "
|
f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s")
|
|
# 消息已被聚合,不需要立即处理
|
return False, None
|
|
def _process_aggregated_messages(self, aggregation_key: str):
|
"""
|
处理聚合的消息(定时器回调)
|
|
Args:
|
aggregation_key: 聚合键
|
"""
|
try:
|
with self.lock:
|
if aggregation_key not in self.aggregations:
|
logger.warning(f"聚合键不存在: {aggregation_key}")
|
return
|
|
aggregation = self.aggregations[aggregation_key]
|
|
# 获取聚合后的消息数据
|
aggregated_content = aggregation.get_aggregated_content()
|
latest_message_data = aggregation.get_latest_message_data()
|
|
if not aggregated_content or not latest_message_data:
|
logger.warning(f"聚合数据为空: {aggregation_key}")
|
self._cleanup_aggregation(aggregation_key)
|
return
|
|
# 创建聚合后的消息数据
|
aggregated_message_data = latest_message_data.copy()
|
aggregated_message_data["data"]["content"] = aggregated_content
|
|
message_count = len(aggregation.messages)
|
logger.info(f"处理聚合消息: user={aggregation.from_user}, group={aggregation.from_group}, "
|
f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}")
|
|
# 清理聚合数据
|
self._cleanup_aggregation(aggregation_key)
|
|
# 调用消息处理器处理聚合后的消息(延迟导入避免循环依赖)
|
from app.services.message_processor import message_processor
|
success = message_processor.process_single_message(aggregated_message_data)
|
|
if success:
|
logger.info(f"聚合消息处理成功: user={aggregation.from_user}, group={aggregation.from_group}")
|
else:
|
logger.error(f"聚合消息处理失败: user={aggregation.from_user}, group={aggregation.from_group}")
|
|
except Exception as e:
|
logger.error(f"处理聚合消息异常: aggregation_key={aggregation_key}, error={str(e)}")
|
self._cleanup_aggregation(aggregation_key)
|
|
def _cleanup_aggregation(self, aggregation_key: str):
|
"""
|
清理聚合数据
|
|
Args:
|
aggregation_key: 聚合键
|
"""
|
try:
|
with self.lock:
|
if aggregation_key in self.aggregations:
|
aggregation = self.aggregations[aggregation_key]
|
aggregation.clear()
|
del self.aggregations[aggregation_key]
|
logger.debug(f"聚合数据已清理: {aggregation_key}")
|
except Exception as e:
|
logger.error(f"清理聚合数据异常: aggregation_key={aggregation_key}, error={str(e)}")
|
|
def get_aggregation_status(self) -> Dict[str, Any]:
|
"""
|
获取聚合状态信息
|
|
Returns:
|
聚合状态信息
|
"""
|
with self.lock:
|
status = {
|
"enabled": self.aggregation_enabled,
|
"timeout": self.aggregation_timeout,
|
"active_aggregations": len(self.aggregations),
|
"aggregations": {}
|
}
|
|
for key, aggregation in self.aggregations.items():
|
with aggregation.lock:
|
status["aggregations"][key] = {
|
"from_user": aggregation.from_user,
|
"from_group": aggregation.from_group,
|
"message_count": len(aggregation.messages),
|
"first_message_time": aggregation.first_message_time,
|
"last_message_time": aggregation.last_message_time,
|
"elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0
|
}
|
|
return status
|
|
def force_process_aggregation(self, from_user: str, from_group: str) -> bool:
|
"""
|
强制处理指定用户和群组的聚合消息
|
|
Args:
|
from_user: 用户ID
|
from_group: 群组ID
|
|
Returns:
|
是否成功处理
|
"""
|
aggregation_key = self._get_aggregation_key(from_user, from_group)
|
|
with self.lock:
|
if aggregation_key not in self.aggregations:
|
logger.warning(f"没有找到待聚合的消息: user={from_user}, group={from_group}")
|
return False
|
|
logger.info(f"强制处理聚合消息: user={from_user}, group={from_group}")
|
self._process_aggregated_messages(aggregation_key)
|
return True
|
|
def stop(self):
|
"""停止聚合服务"""
|
logger.info("正在停止消息聚合服务...")
|
|
with self.lock:
|
for aggregation_key, aggregation in self.aggregations.items():
|
try:
|
with aggregation.lock:
|
if aggregation.timer:
|
aggregation.timer.cancel()
|
logger.debug(f"已取消聚合定时器: {aggregation_key}")
|
except Exception as e:
|
logger.error(f"取消聚合定时器异常: {aggregation_key}, error={str(e)}")
|
|
self.aggregations.clear()
|
|
logger.info("消息聚合服务已停止")
|
|
|
# 全局消息聚合服务实例
|
message_aggregator = MessageAggregatorService()
|