""" 在线状态监控功能测试 """ import pytest import time from unittest.mock import Mock, patch from app.services.ecloud_client import ecloud_client from app.services.email_service import email_service from app.services.sms_service import sms_service from app.workers.online_status_worker import online_status_worker class TestOnlineStatusMonitor: """在线状态监控测试""" def test_ecloud_query_online_wechat_list(self): """测试查询在线微信列表""" # 模拟有在线微信的情况 with patch.object(ecloud_client.session, 'post') as mock_post: mock_response = Mock() mock_response.json.return_value = { "code": "1000", "message": "成功", "data": [ { "wcId": "wxid_test123", "wId": "test-w-id-123" } ] } mock_post.return_value = mock_response result = ecloud_client.query_online_wechat_list() assert result is not None assert len(result) == 1 assert result[0]["wcId"] == "wxid_test123" # 模拟没有在线微信的情况 with patch.object(ecloud_client.session, 'post') as mock_post: mock_response = Mock() mock_response.json.return_value = { "code": "1000", "message": "成功", "data": [] } mock_post.return_value = mock_response result = ecloud_client.query_online_wechat_list() assert result is not None assert len(result) == 0 def test_sms_service_generate_sign(self): """测试短信服务签名生成""" # 使用文档中的示例数据 sms_service.username = "test" sms_service.password = "123" timestamp = 1596254400000 sign = sms_service._generate_sign(timestamp) expected_sign = "e315cf297826abdeb2092cc57f29f0bf" assert sign == expected_sign def test_email_service_disabled(self): """测试邮件服务禁用状态""" with patch('config.settings.email_enabled', False): result = email_service.send_notification("测试主题", "测试内容") assert result is True # 禁用时应该返回True def test_sms_service_disabled(self): """测试短信服务禁用状态""" with patch('config.settings.sms_enabled', False): result = sms_service.send_notification("测试内容") assert result is True # 禁用时应该返回True def test_online_status_worker_disabled(self): """测试在线状态检测工作进程禁用状态""" with patch('config.settings.online_status_enabled', False): # 创建新的工作进程实例进行测试 from app.workers.online_status_worker import OnlineStatusWorker test_worker = OnlineStatusWorker() test_worker.start() assert test_worker.running is False assert test_worker.worker_thread is None def test_online_status_worker_status(self): """测试获取工作进程状态""" status = online_status_worker.get_status() assert "running" in status assert "enabled" in status assert "check_interval_minutes" in status assert "last_notification_time" in status assert "notification_cooldown_minutes" in status assert isinstance(status["running"], bool) assert isinstance(status["enabled"], bool) assert isinstance(status["check_interval_minutes"], (int, float)) assert isinstance(status["notification_cooldown_minutes"], (int, float)) def test_dynamic_w_id_update(self): """测试动态w_id更新功能""" from config import settings # 保存原始w_id original_w_id = settings.get_current_w_id() try: # 测试更新w_id new_w_id = "test-new-w-id-123" success = settings.update_ecloud_w_id(new_w_id) # 验证更新成功 assert success is True assert settings.get_current_w_id() == new_w_id # 测试相同w_id不会重复更新 success = settings.update_ecloud_w_id(new_w_id) assert success is True finally: # 恢复原始w_id settings.update_ecloud_w_id(original_w_id) def test_w_id_update_in_online_status_check(self): """测试在线状态检测中的w_id更新""" from app.workers.online_status_worker import OnlineStatusWorker # 创建测试工作进程实例 test_worker = OnlineStatusWorker() # 模拟w_id更新 test_w_id = "test-w-id-456" test_worker._update_w_id_if_needed(test_w_id) # 这里主要测试方法不会抛出异常 # 实际的w_id更新逻辑在settings中已经测试过了 def test_startup_contact_sync_logic(self): """测试启动时联系人同步逻辑""" # 测试没有在线微信时的处理逻辑 with patch.object(ecloud_client, 'query_online_wechat_list') as mock_query: # 模拟查询失败 mock_query.return_value = None # 这种情况下应该跳过联系人同步,不会抛出异常 # 模拟没有在线微信 mock_query.return_value = [] # 这种情况下也应该跳过联系人同步 # 模拟有在线微信 mock_query.return_value = [{"wcId": "test_wc_id", "wId": "test_w_id"}] # 这种情况下会尝试同步联系人 def test_email_service_connection_handling(self): """测试邮件服务连接处理""" # 测试邮件服务的连接和关闭逻辑 # 主要确保不会因为连接关闭异常而影响发送结果 # 这里主要测试方法结构,实际的SMTP测试需要真实的邮件服务器 assert hasattr(email_service, 'send_email') assert hasattr(email_service, 'test_connection') if __name__ == "__main__": pytest.main([__file__])