""" 消息聚合服务测试 """ import time import threading import pytest from unittest.mock import patch, MagicMock from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation class TestMessageAggregatorService: """消息聚合服务测试类""" def setup_method(self): """测试前准备""" self.aggregator = MessageAggregatorService() # 设置较短的超时时间用于测试 self.aggregator.aggregation_timeout = 2 self.aggregator.aggregation_enabled = True def teardown_method(self): """测试后清理""" self.aggregator.stop() def test_should_aggregate_message_valid(self): """测试有效消息的聚合判断""" message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } result = self.aggregator.should_aggregate_message(message_data) assert result is True def test_should_aggregate_message_disabled(self): """测试聚合功能禁用时的判断""" self.aggregator.aggregation_enabled = False message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } result = self.aggregator.should_aggregate_message(message_data) assert result is False def test_should_aggregate_message_wrong_type(self): """测试错误消息类型的聚合判断""" message_data = { "messageType": "80002", # 非群聊消息 "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } result = self.aggregator.should_aggregate_message(message_data) assert result is False def test_should_aggregate_message_self_message(self): """测试自己发送的消息的聚合判断""" message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": True # 自己发送的消息 } } result = self.aggregator.should_aggregate_message(message_data) assert result is False def test_should_aggregate_message_missing_fields(self): """测试缺少必要字段的消息的聚合判断""" message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", # 缺少 fromGroup "content": "测试消息", "self": False } } result = self.aggregator.should_aggregate_message(message_data) assert result is False def test_add_message_to_aggregation_no_aggregation(self): """测试不需要聚合的消息""" self.aggregator.aggregation_enabled = False message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) assert should_process is True assert aggregated_data == message_data def test_add_message_to_aggregation_first_message(self): """测试添加第一条消息到聚合""" message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "第一条消息", "self": False } } should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) assert should_process is False assert aggregated_data is None # 检查聚合状态 status = self.aggregator.get_aggregation_status() assert status["active_aggregations"] == 1 assert "group123@chatroom:wxid_test123" in status["aggregations"] def test_add_message_to_aggregation_multiple_messages(self): """测试添加多条消息到聚合""" message_data_1 = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "第一条消息", "self": False } } message_data_2 = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "第二条消息", "self": False } } # 添加第一条消息 should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1) assert should_process_1 is False # 添加第二条消息 should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2) assert should_process_2 is False # 检查聚合状态 status = self.aggregator.get_aggregation_status() aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] assert aggregation_info["message_count"] == 2 @patch('app.services.message_processor.message_processor') def test_process_aggregated_messages_timeout(self, mock_message_processor): """测试聚合消息超时处理""" mock_message_processor.process_single_message.return_value = True message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } # 添加消息到聚合 should_process, _ = self.aggregator.add_message_to_aggregation(message_data) assert should_process is False # 等待超时处理 time.sleep(2.5) # 验证消息处理器被调用 mock_message_processor.process_single_message.assert_called_once() # 检查聚合已被清理 status = self.aggregator.get_aggregation_status() assert status["active_aggregations"] == 0 def test_get_aggregation_status(self): """测试获取聚合状态""" # 初始状态 status = self.aggregator.get_aggregation_status() assert status["enabled"] is True assert status["timeout"] == 2 assert status["active_aggregations"] == 0 assert status["aggregations"] == {} # 添加消息后的状态 message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } self.aggregator.add_message_to_aggregation(message_data) status = self.aggregator.get_aggregation_status() assert status["active_aggregations"] == 1 assert "group123@chatroom:wxid_test123" in status["aggregations"] aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] assert aggregation_info["from_user"] == "wxid_test123" assert aggregation_info["from_group"] == "group123@chatroom" assert aggregation_info["message_count"] == 1 @patch('app.services.message_processor.message_processor') def test_force_process_aggregation(self, mock_message_processor): """测试强制处理聚合消息""" mock_message_processor.process_single_message.return_value = True message_data = { "messageType": "80001", "data": { "fromUser": "wxid_test123", "fromGroup": "group123@chatroom", "content": "测试消息", "self": False } } # 添加消息到聚合 self.aggregator.add_message_to_aggregation(message_data) # 强制处理聚合 result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") assert result is True # 验证消息处理器被调用 mock_message_processor.process_single_message.assert_called_once() # 检查聚合已被清理 status = self.aggregator.get_aggregation_status() assert status["active_aggregations"] == 0 def test_force_process_aggregation_not_found(self): """测试强制处理不存在的聚合""" result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") assert result is False class TestPendingMessage: """待聚合消息测试类""" def test_pending_message_creation(self): """测试待聚合消息创建""" message_data = {"test": "data"} timestamp = time.time() pending_message = PendingMessage( message_data=message_data, timestamp=timestamp, content="测试内容", from_user="wxid_test123", from_group="group123@chatroom" ) assert pending_message.message_data == message_data assert pending_message.timestamp == timestamp assert pending_message.content == "测试内容" assert pending_message.from_user == "wxid_test123" assert pending_message.from_group == "group123@chatroom" class TestMessageAggregation: """消息聚合对象测试类""" def test_message_aggregation_creation(self): """测试消息聚合对象创建""" aggregation = MessageAggregation( from_user="wxid_test123", from_group="group123@chatroom" ) assert aggregation.from_user == "wxid_test123" assert aggregation.from_group == "group123@chatroom" assert len(aggregation.messages) == 0 assert aggregation.first_message_time == 0.0 assert aggregation.last_message_time == 0.0 assert aggregation.timer is None def test_add_message(self): """测试添加消息到聚合""" aggregation = MessageAggregation( from_user="wxid_test123", from_group="group123@chatroom" ) timestamp = time.time() pending_message = PendingMessage( message_data={"test": "data"}, timestamp=timestamp, content="测试内容", from_user="wxid_test123", from_group="group123@chatroom" ) aggregation.add_message(pending_message) assert len(aggregation.messages) == 1 assert aggregation.first_message_time == timestamp assert aggregation.last_message_time == timestamp def test_get_aggregated_content(self): """测试获取聚合后的内容""" aggregation = MessageAggregation( from_user="wxid_test123", from_group="group123@chatroom" ) # 添加多条消息 for i in range(3): pending_message = PendingMessage( message_data={"test": f"data{i}"}, timestamp=time.time() + i, content=f"消息{i+1}", from_user="wxid_test123", from_group="group123@chatroom" ) aggregation.add_message(pending_message) aggregated_content = aggregation.get_aggregated_content() assert aggregated_content == "消息1\n消息2\n消息3" def test_get_latest_message_data(self): """测试获取最新消息数据""" aggregation = MessageAggregation( from_user="wxid_test123", from_group="group123@chatroom" ) # 添加多条消息 for i in range(3): pending_message = PendingMessage( message_data={"test": f"data{i}", "timestamp": i}, timestamp=time.time() + i, content=f"消息{i+1}", from_user="wxid_test123", from_group="group123@chatroom" ) aggregation.add_message(pending_message) latest_data = aggregation.get_latest_message_data() assert latest_data["test"] == "data2" # 最后一条消息 assert latest_data["timestamp"] == 2 def test_clear(self): """测试清空聚合数据""" aggregation = MessageAggregation( from_user="wxid_test123", from_group="group123@chatroom" ) # 添加消息 pending_message = PendingMessage( message_data={"test": "data"}, timestamp=time.time(), content="测试内容", from_user="wxid_test123", from_group="group123@chatroom" ) aggregation.add_message(pending_message) # 设置定时器 aggregation.timer = threading.Timer(1, lambda: None) # 清空 aggregation.clear() assert len(aggregation.messages) == 0 assert aggregation.first_message_time == 0.0 assert aggregation.last_message_time == 0.0 assert aggregation.timer is None