yj
2025-08-07 2e391d599d08ea7a7c11442bc2845a1191494c3d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""
消息聚合服务
实现同一群组中同一用户短时间内多条消息的聚合功能
"""
 
import time
import threading
import json
from typing import Dict, List, Optional, Any, Tuple
from loguru import logger
from dataclasses import dataclass, field
from datetime import datetime, timedelta
 
from config import settings
 
 
@dataclass
class PendingMessage:
    """待聚合的消息"""
    message_data: Dict[str, Any]
    timestamp: float
    content: str
    from_user: str
    from_group: str
 
 
@dataclass
class MessageAggregation:
    """消息聚合对象"""
    from_user: str
    from_group: str
    messages: List[PendingMessage] = field(default_factory=list)
    first_message_time: float = 0.0
    last_message_time: float = 0.0
    timer: Optional[threading.Timer] = None
    lock: threading.Lock = field(default_factory=threading.Lock)
 
    def add_message(self, message: PendingMessage):
        """添加消息到聚合中"""
        with self.lock:
            self.messages.append(message)
            if not self.first_message_time:
                self.first_message_time = message.timestamp
            self.last_message_time = message.timestamp
 
    def get_aggregated_content(self) -> str:
        """获取聚合后的消息内容"""
        with self.lock:
            if not self.messages:
                return ""
            
            # 按时间戳排序消息
            sorted_messages = sorted(self.messages, key=lambda x: x.timestamp)
            
            # 合并消息内容
            contents = [msg.content for msg in sorted_messages]
            return "\n".join(contents)
 
    def get_latest_message_data(self) -> Optional[Dict[str, Any]]:
        """获取最新的消息数据作为模板"""
        with self.lock:
            if not self.messages:
                return None
            
            # 返回最后一条消息的数据作为模板
            latest_message = max(self.messages, key=lambda x: x.timestamp)
            return latest_message.message_data.copy()
 
    def clear(self):
        """清空聚合数据"""
        with self.lock:
            self.messages.clear()
            self.first_message_time = 0.0
            self.last_message_time = 0.0
            if self.timer:
                self.timer.cancel()
                self.timer = None
 
 
class MessageAggregatorService:
    """消息聚合服务"""
 
    def __init__(self):
        self.aggregations: Dict[str, MessageAggregation] = {}
        self.lock = threading.Lock()
        self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15)  # 默认15秒
        self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True)  # 默认启用
        
        logger.info(f"消息聚合服务初始化: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}")
 
    def _get_aggregation_key(self, from_user: str, from_group: str) -> str:
        """获取聚合键"""
        return f"{from_group}:{from_user}"
 
    def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool:
        """
        判断消息是否应该被聚合
        
        Args:
            message_data: 消息数据
            
        Returns:
            是否应该聚合
        """
        if not self.aggregation_enabled:
            return False
            
        # 检查消息类型是否是群聊消息
        message_type = message_data.get("messageType")
        if message_type != "80001":
            return False
            
        data = message_data.get("data", {})
        
        # 检查是否是自己发送的消息
        if data.get("self", False):
            return False
            
        # 检查必要字段
        if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"):
            return False
            
        return True
 
    def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
        """
        将消息添加到聚合中
        
        Args:
            message_data: 消息数据
            
        Returns:
            (是否需要立即处理, 聚合后的消息数据)
            - 如果返回 (False, None),表示消息已被聚合,不需要立即处理
            - 如果返回 (True, aggregated_data),表示聚合超时,需要处理聚合后的消息
        """
        if not self.should_aggregate_message(message_data):
            # 不需要聚合,直接处理
            return True, message_data
            
        data = message_data.get("data", {})
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
        content = data.get("content", "")
        
        aggregation_key = self._get_aggregation_key(from_user, from_group)
        current_time = time.time()
        
        pending_message = PendingMessage(
            message_data=message_data,
            timestamp=current_time,
            content=content,
            from_user=from_user,
            from_group=from_group
        )
        
        with self.lock:
            if aggregation_key not in self.aggregations:
                # 创建新的聚合
                self.aggregations[aggregation_key] = MessageAggregation(
                    from_user=from_user,
                    from_group=from_group
                )
                
            aggregation = self.aggregations[aggregation_key]
            
        # 取消之前的定时器
        with aggregation.lock:
            if aggregation.timer:
                aggregation.timer.cancel()
                
        # 添加消息到聚合
        aggregation.add_message(pending_message)
        
        # 设置新的定时器
        timer = threading.Timer(
            self.aggregation_timeout,
            self._process_aggregated_messages,
            args=(aggregation_key,)
        )
        
        with aggregation.lock:
            aggregation.timer = timer
            
        timer.start()
        
        logger.info(f"消息已添加到聚合: user={from_user}, group={from_group}, "
                   f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s")
        
        # 消息已被聚合,不需要立即处理
        return False, None
 
    def _process_aggregated_messages(self, aggregation_key: str):
        """
        处理聚合的消息(定时器回调)
        
        Args:
            aggregation_key: 聚合键
        """
        try:
            with self.lock:
                if aggregation_key not in self.aggregations:
                    logger.warning(f"聚合键不存在: {aggregation_key}")
                    return
                    
                aggregation = self.aggregations[aggregation_key]
                
            # 获取聚合后的消息数据
            aggregated_content = aggregation.get_aggregated_content()
            latest_message_data = aggregation.get_latest_message_data()
            
            if not aggregated_content or not latest_message_data:
                logger.warning(f"聚合数据为空: {aggregation_key}")
                self._cleanup_aggregation(aggregation_key)
                return
                
            # 创建聚合后的消息数据
            aggregated_message_data = latest_message_data.copy()
            aggregated_message_data["data"]["content"] = aggregated_content
            
            message_count = len(aggregation.messages)
            logger.info(f"处理聚合消息: user={aggregation.from_user}, group={aggregation.from_group}, "
                       f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}")
            
            # 清理聚合数据
            self._cleanup_aggregation(aggregation_key)
            
            # 调用消息处理器处理聚合后的消息(延迟导入避免循环依赖)
            from app.services.message_processor import message_processor
            success = message_processor.process_single_message(aggregated_message_data)
            
            if success:
                logger.info(f"聚合消息处理成功: user={aggregation.from_user}, group={aggregation.from_group}")
            else:
                logger.error(f"聚合消息处理失败: user={aggregation.from_user}, group={aggregation.from_group}")
                
        except Exception as e:
            logger.error(f"处理聚合消息异常: aggregation_key={aggregation_key}, error={str(e)}")
            self._cleanup_aggregation(aggregation_key)
 
    def _cleanup_aggregation(self, aggregation_key: str):
        """
        清理聚合数据
        
        Args:
            aggregation_key: 聚合键
        """
        try:
            with self.lock:
                if aggregation_key in self.aggregations:
                    aggregation = self.aggregations[aggregation_key]
                    aggregation.clear()
                    del self.aggregations[aggregation_key]
                    logger.debug(f"聚合数据已清理: {aggregation_key}")
        except Exception as e:
            logger.error(f"清理聚合数据异常: aggregation_key={aggregation_key}, error={str(e)}")
 
    def get_aggregation_status(self) -> Dict[str, Any]:
        """
        获取聚合状态信息
        
        Returns:
            聚合状态信息
        """
        with self.lock:
            status = {
                "enabled": self.aggregation_enabled,
                "timeout": self.aggregation_timeout,
                "active_aggregations": len(self.aggregations),
                "aggregations": {}
            }
            
            for key, aggregation in self.aggregations.items():
                with aggregation.lock:
                    status["aggregations"][key] = {
                        "from_user": aggregation.from_user,
                        "from_group": aggregation.from_group,
                        "message_count": len(aggregation.messages),
                        "first_message_time": aggregation.first_message_time,
                        "last_message_time": aggregation.last_message_time,
                        "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0
                    }
                    
            return status
 
    def force_process_aggregation(self, from_user: str, from_group: str) -> bool:
        """
        强制处理指定用户和群组的聚合消息
        
        Args:
            from_user: 用户ID
            from_group: 群组ID
            
        Returns:
            是否成功处理
        """
        aggregation_key = self._get_aggregation_key(from_user, from_group)
        
        with self.lock:
            if aggregation_key not in self.aggregations:
                logger.warning(f"没有找到待聚合的消息: user={from_user}, group={from_group}")
                return False
                
        logger.info(f"强制处理聚合消息: user={from_user}, group={from_group}")
        self._process_aggregated_messages(aggregation_key)
        return True
 
    def stop(self):
        """停止聚合服务"""
        logger.info("正在停止消息聚合服务...")
        
        with self.lock:
            for aggregation_key, aggregation in self.aggregations.items():
                try:
                    with aggregation.lock:
                        if aggregation.timer:
                            aggregation.timer.cancel()
                    logger.debug(f"已取消聚合定时器: {aggregation_key}")
                except Exception as e:
                    logger.error(f"取消聚合定时器异常: {aggregation_key}, error={str(e)}")
                    
            self.aggregations.clear()
            
        logger.info("消息聚合服务已停止")
 
 
# 全局消息聚合服务实例
message_aggregator = MessageAggregatorService()