yj
2025-08-07 2e391d599d08ea7a7c11442bc2845a1191494c3d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
"""
消息聚合服务测试
"""
 
import time
import threading
import pytest
from unittest.mock import patch, MagicMock
from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation
 
 
class TestMessageAggregatorService:
    """消息聚合服务测试类"""
    
    def setup_method(self):
        """测试前准备"""
        self.aggregator = MessageAggregatorService()
        # 设置较短的超时时间用于测试
        self.aggregator.aggregation_timeout = 2
        self.aggregator.aggregation_enabled = True
    
    def teardown_method(self):
        """测试后清理"""
        self.aggregator.stop()
    
    def test_should_aggregate_message_valid(self):
        """测试有效消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is True
    
    def test_should_aggregate_message_disabled(self):
        """测试聚合功能禁用时的判断"""
        self.aggregator.aggregation_enabled = False
        
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    
    def test_should_aggregate_message_wrong_type(self):
        """测试错误消息类型的聚合判断"""
        message_data = {
            "messageType": "80002",  # 非群聊消息
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    
    def test_should_aggregate_message_self_message(self):
        """测试自己发送的消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": True  # 自己发送的消息
            }
        }
        
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    
    def test_should_aggregate_message_missing_fields(self):
        """测试缺少必要字段的消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                # 缺少 fromGroup
                "content": "测试消息",
                "self": False
            }
        }
        
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    
    def test_add_message_to_aggregation_no_aggregation(self):
        """测试不需要聚合的消息"""
        self.aggregator.aggregation_enabled = False
        
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
        
        assert should_process is True
        assert aggregated_data == message_data
    
    def test_add_message_to_aggregation_first_message(self):
        """测试添加第一条消息到聚合"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第一条消息",
                "self": False
            }
        }
        
        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
        
        assert should_process is False
        assert aggregated_data is None
        
        # 检查聚合状态
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 1
        assert "group123@chatroom:wxid_test123" in status["aggregations"]
    
    def test_add_message_to_aggregation_multiple_messages(self):
        """测试添加多条消息到聚合"""
        message_data_1 = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第一条消息",
                "self": False
            }
        }
        
        message_data_2 = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第二条消息",
                "self": False
            }
        }
        
        # 添加第一条消息
        should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1)
        assert should_process_1 is False
        
        # 添加第二条消息
        should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2)
        assert should_process_2 is False
        
        # 检查聚合状态
        status = self.aggregator.get_aggregation_status()
        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
        assert aggregation_info["message_count"] == 2
    
    @patch('app.services.message_processor.message_processor')
    def test_process_aggregated_messages_timeout(self, mock_message_processor):
        """测试聚合消息超时处理"""
        mock_message_processor.process_single_message.return_value = True
        
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        # 添加消息到聚合
        should_process, _ = self.aggregator.add_message_to_aggregation(message_data)
        assert should_process is False
        
        # 等待超时处理
        time.sleep(2.5)
        
        # 验证消息处理器被调用
        mock_message_processor.process_single_message.assert_called_once()
        
        # 检查聚合已被清理
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 0
    
    def test_get_aggregation_status(self):
        """测试获取聚合状态"""
        # 初始状态
        status = self.aggregator.get_aggregation_status()
        assert status["enabled"] is True
        assert status["timeout"] == 2
        assert status["active_aggregations"] == 0
        assert status["aggregations"] == {}
        
        # 添加消息后的状态
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        self.aggregator.add_message_to_aggregation(message_data)
        
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 1
        assert "group123@chatroom:wxid_test123" in status["aggregations"]
        
        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
        assert aggregation_info["from_user"] == "wxid_test123"
        assert aggregation_info["from_group"] == "group123@chatroom"
        assert aggregation_info["message_count"] == 1
    
    @patch('app.services.message_processor.message_processor')
    def test_force_process_aggregation(self, mock_message_processor):
        """测试强制处理聚合消息"""
        mock_message_processor.process_single_message.return_value = True
        
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        
        # 添加消息到聚合
        self.aggregator.add_message_to_aggregation(message_data)
        
        # 强制处理聚合
        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
        assert result is True
        
        # 验证消息处理器被调用
        mock_message_processor.process_single_message.assert_called_once()
        
        # 检查聚合已被清理
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 0
    
    def test_force_process_aggregation_not_found(self):
        """测试强制处理不存在的聚合"""
        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
        assert result is False
 
 
class TestPendingMessage:
    """待聚合消息测试类"""
    
    def test_pending_message_creation(self):
        """测试待聚合消息创建"""
        message_data = {"test": "data"}
        timestamp = time.time()
        
        pending_message = PendingMessage(
            message_data=message_data,
            timestamp=timestamp,
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        assert pending_message.message_data == message_data
        assert pending_message.timestamp == timestamp
        assert pending_message.content == "测试内容"
        assert pending_message.from_user == "wxid_test123"
        assert pending_message.from_group == "group123@chatroom"
 
 
class TestMessageAggregation:
    """消息聚合对象测试类"""
    
    def test_message_aggregation_creation(self):
        """测试消息聚合对象创建"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        assert aggregation.from_user == "wxid_test123"
        assert aggregation.from_group == "group123@chatroom"
        assert len(aggregation.messages) == 0
        assert aggregation.first_message_time == 0.0
        assert aggregation.last_message_time == 0.0
        assert aggregation.timer is None
    
    def test_add_message(self):
        """测试添加消息到聚合"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        timestamp = time.time()
        pending_message = PendingMessage(
            message_data={"test": "data"},
            timestamp=timestamp,
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        aggregation.add_message(pending_message)
        
        assert len(aggregation.messages) == 1
        assert aggregation.first_message_time == timestamp
        assert aggregation.last_message_time == timestamp
    
    def test_get_aggregated_content(self):
        """测试获取聚合后的内容"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        # 添加多条消息
        for i in range(3):
            pending_message = PendingMessage(
                message_data={"test": f"data{i}"},
                timestamp=time.time() + i,
                content=f"消息{i+1}",
                from_user="wxid_test123",
                from_group="group123@chatroom"
            )
            aggregation.add_message(pending_message)
        
        aggregated_content = aggregation.get_aggregated_content()
        assert aggregated_content == "消息1\n消息2\n消息3"
    
    def test_get_latest_message_data(self):
        """测试获取最新消息数据"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        # 添加多条消息
        for i in range(3):
            pending_message = PendingMessage(
                message_data={"test": f"data{i}", "timestamp": i},
                timestamp=time.time() + i,
                content=f"消息{i+1}",
                from_user="wxid_test123",
                from_group="group123@chatroom"
            )
            aggregation.add_message(pending_message)
        
        latest_data = aggregation.get_latest_message_data()
        assert latest_data["test"] == "data2"  # 最后一条消息
        assert latest_data["timestamp"] == 2
    
    def test_clear(self):
        """测试清空聚合数据"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        
        # 添加消息
        pending_message = PendingMessage(
            message_data={"test": "data"},
            timestamp=time.time(),
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        aggregation.add_message(pending_message)
        
        # 设置定时器
        aggregation.timer = threading.Timer(1, lambda: None)
        
        # 清空
        aggregation.clear()
        
        assert len(aggregation.messages) == 0
        assert aggregation.first_message_time == 0.0
        assert aggregation.last_message_time == 0.0
        assert aggregation.timer is None