"""
|
消息聚合服务测试
|
"""
|
|
import time
|
import threading
|
import pytest
|
from unittest.mock import patch, MagicMock
|
from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation
|
|
|
class TestMessageAggregatorService:
|
"""消息聚合服务测试类"""
|
|
def setup_method(self):
|
"""测试前准备"""
|
self.aggregator = MessageAggregatorService()
|
# 设置较短的超时时间用于测试
|
self.aggregator.aggregation_timeout = 2
|
self.aggregator.aggregation_enabled = True
|
|
def teardown_method(self):
|
"""测试后清理"""
|
self.aggregator.stop()
|
|
def test_should_aggregate_message_valid(self):
|
"""测试有效消息的聚合判断"""
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
result = self.aggregator.should_aggregate_message(message_data)
|
assert result is True
|
|
def test_should_aggregate_message_disabled(self):
|
"""测试聚合功能禁用时的判断"""
|
self.aggregator.aggregation_enabled = False
|
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
result = self.aggregator.should_aggregate_message(message_data)
|
assert result is False
|
|
def test_should_aggregate_message_wrong_type(self):
|
"""测试错误消息类型的聚合判断"""
|
message_data = {
|
"messageType": "80002", # 非群聊消息
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
result = self.aggregator.should_aggregate_message(message_data)
|
assert result is False
|
|
def test_should_aggregate_message_self_message(self):
|
"""测试自己发送的消息的聚合判断"""
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": True # 自己发送的消息
|
}
|
}
|
|
result = self.aggregator.should_aggregate_message(message_data)
|
assert result is False
|
|
def test_should_aggregate_message_missing_fields(self):
|
"""测试缺少必要字段的消息的聚合判断"""
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
# 缺少 fromGroup
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
result = self.aggregator.should_aggregate_message(message_data)
|
assert result is False
|
|
def test_add_message_to_aggregation_no_aggregation(self):
|
"""测试不需要聚合的消息"""
|
self.aggregator.aggregation_enabled = False
|
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
|
|
assert should_process is True
|
assert aggregated_data == message_data
|
|
def test_add_message_to_aggregation_first_message(self):
|
"""测试添加第一条消息到聚合"""
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "第一条消息",
|
"self": False
|
}
|
}
|
|
should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
|
|
assert should_process is False
|
assert aggregated_data is None
|
|
# 检查聚合状态
|
status = self.aggregator.get_aggregation_status()
|
assert status["active_aggregations"] == 1
|
assert "group123@chatroom:wxid_test123" in status["aggregations"]
|
|
def test_add_message_to_aggregation_multiple_messages(self):
|
"""测试添加多条消息到聚合"""
|
message_data_1 = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "第一条消息",
|
"self": False
|
}
|
}
|
|
message_data_2 = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "第二条消息",
|
"self": False
|
}
|
}
|
|
# 添加第一条消息
|
should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1)
|
assert should_process_1 is False
|
|
# 添加第二条消息
|
should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2)
|
assert should_process_2 is False
|
|
# 检查聚合状态
|
status = self.aggregator.get_aggregation_status()
|
aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
|
assert aggregation_info["message_count"] == 2
|
|
@patch('app.services.message_processor.message_processor')
|
def test_process_aggregated_messages_timeout(self, mock_message_processor):
|
"""测试聚合消息超时处理"""
|
mock_message_processor.process_single_message.return_value = True
|
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
# 添加消息到聚合
|
should_process, _ = self.aggregator.add_message_to_aggregation(message_data)
|
assert should_process is False
|
|
# 等待超时处理
|
time.sleep(2.5)
|
|
# 验证消息处理器被调用
|
mock_message_processor.process_single_message.assert_called_once()
|
|
# 检查聚合已被清理
|
status = self.aggregator.get_aggregation_status()
|
assert status["active_aggregations"] == 0
|
|
def test_get_aggregation_status(self):
|
"""测试获取聚合状态"""
|
# 初始状态
|
status = self.aggregator.get_aggregation_status()
|
assert status["enabled"] is True
|
assert status["timeout"] == 2
|
assert status["active_aggregations"] == 0
|
assert status["aggregations"] == {}
|
|
# 添加消息后的状态
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
self.aggregator.add_message_to_aggregation(message_data)
|
|
status = self.aggregator.get_aggregation_status()
|
assert status["active_aggregations"] == 1
|
assert "group123@chatroom:wxid_test123" in status["aggregations"]
|
|
aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
|
assert aggregation_info["from_user"] == "wxid_test123"
|
assert aggregation_info["from_group"] == "group123@chatroom"
|
assert aggregation_info["message_count"] == 1
|
|
@patch('app.services.message_processor.message_processor')
|
def test_force_process_aggregation(self, mock_message_processor):
|
"""测试强制处理聚合消息"""
|
mock_message_processor.process_single_message.return_value = True
|
|
message_data = {
|
"messageType": "80001",
|
"data": {
|
"fromUser": "wxid_test123",
|
"fromGroup": "group123@chatroom",
|
"content": "测试消息",
|
"self": False
|
}
|
}
|
|
# 添加消息到聚合
|
self.aggregator.add_message_to_aggregation(message_data)
|
|
# 强制处理聚合
|
result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
|
assert result is True
|
|
# 验证消息处理器被调用
|
mock_message_processor.process_single_message.assert_called_once()
|
|
# 检查聚合已被清理
|
status = self.aggregator.get_aggregation_status()
|
assert status["active_aggregations"] == 0
|
|
def test_force_process_aggregation_not_found(self):
|
"""测试强制处理不存在的聚合"""
|
result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
|
assert result is False
|
|
|
class TestPendingMessage:
|
"""待聚合消息测试类"""
|
|
def test_pending_message_creation(self):
|
"""测试待聚合消息创建"""
|
message_data = {"test": "data"}
|
timestamp = time.time()
|
|
pending_message = PendingMessage(
|
message_data=message_data,
|
timestamp=timestamp,
|
content="测试内容",
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
assert pending_message.message_data == message_data
|
assert pending_message.timestamp == timestamp
|
assert pending_message.content == "测试内容"
|
assert pending_message.from_user == "wxid_test123"
|
assert pending_message.from_group == "group123@chatroom"
|
|
|
class TestMessageAggregation:
|
"""消息聚合对象测试类"""
|
|
def test_message_aggregation_creation(self):
|
"""测试消息聚合对象创建"""
|
aggregation = MessageAggregation(
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
assert aggregation.from_user == "wxid_test123"
|
assert aggregation.from_group == "group123@chatroom"
|
assert len(aggregation.messages) == 0
|
assert aggregation.first_message_time == 0.0
|
assert aggregation.last_message_time == 0.0
|
assert aggregation.timer is None
|
|
def test_add_message(self):
|
"""测试添加消息到聚合"""
|
aggregation = MessageAggregation(
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
timestamp = time.time()
|
pending_message = PendingMessage(
|
message_data={"test": "data"},
|
timestamp=timestamp,
|
content="测试内容",
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
aggregation.add_message(pending_message)
|
|
assert len(aggregation.messages) == 1
|
assert aggregation.first_message_time == timestamp
|
assert aggregation.last_message_time == timestamp
|
|
def test_get_aggregated_content(self):
|
"""测试获取聚合后的内容"""
|
aggregation = MessageAggregation(
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
# 添加多条消息
|
for i in range(3):
|
pending_message = PendingMessage(
|
message_data={"test": f"data{i}"},
|
timestamp=time.time() + i,
|
content=f"消息{i+1}",
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
aggregation.add_message(pending_message)
|
|
aggregated_content = aggregation.get_aggregated_content()
|
assert aggregated_content == "消息1\n消息2\n消息3"
|
|
def test_get_latest_message_data(self):
|
"""测试获取最新消息数据"""
|
aggregation = MessageAggregation(
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
# 添加多条消息
|
for i in range(3):
|
pending_message = PendingMessage(
|
message_data={"test": f"data{i}", "timestamp": i},
|
timestamp=time.time() + i,
|
content=f"消息{i+1}",
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
aggregation.add_message(pending_message)
|
|
latest_data = aggregation.get_latest_message_data()
|
assert latest_data["test"] == "data2" # 最后一条消息
|
assert latest_data["timestamp"] == 2
|
|
def test_clear(self):
|
"""测试清空聚合数据"""
|
aggregation = MessageAggregation(
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
|
# 添加消息
|
pending_message = PendingMessage(
|
message_data={"test": "data"},
|
timestamp=time.time(),
|
content="测试内容",
|
from_user="wxid_test123",
|
from_group="group123@chatroom"
|
)
|
aggregation.add_message(pending_message)
|
|
# 设置定时器
|
aggregation.timer = threading.Timer(1, lambda: None)
|
|
# 清空
|
aggregation.clear()
|
|
assert len(aggregation.messages) == 0
|
assert aggregation.first_message_time == 0.0
|
assert aggregation.last_message_time == 0.0
|
assert aggregation.timer is None
|