yj
2025-07-28 69945b730fd3f6b6138ce50e49fc3392fcd74d71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
在线状态监控功能测试
"""
 
import pytest
import time
from unittest.mock import Mock, patch
from app.services.ecloud_client import ecloud_client
from app.services.email_service import email_service
from app.services.sms_service import sms_service
from app.workers.online_status_worker import online_status_worker
 
 
class TestOnlineStatusMonitor:
    """在线状态监控测试"""
 
    def test_ecloud_query_online_wechat_list(self):
        """测试查询在线微信列表"""
        # 模拟有在线微信的情况
        with patch.object(ecloud_client.session, 'post') as mock_post:
            mock_response = Mock()
            mock_response.json.return_value = {
                "code": "1000",
                "message": "成功",
                "data": [
                    {
                        "wcId": "wxid_test123",
                        "wId": "test-w-id-123"
                    }
                ]
            }
            mock_post.return_value = mock_response
            
            result = ecloud_client.query_online_wechat_list()
            assert result is not None
            assert len(result) == 1
            assert result[0]["wcId"] == "wxid_test123"
 
        # 模拟没有在线微信的情况
        with patch.object(ecloud_client.session, 'post') as mock_post:
            mock_response = Mock()
            mock_response.json.return_value = {
                "code": "1000",
                "message": "成功",
                "data": []
            }
            mock_post.return_value = mock_response
            
            result = ecloud_client.query_online_wechat_list()
            assert result is not None
            assert len(result) == 0
 
    def test_sms_service_generate_sign(self):
        """测试短信服务签名生成"""
        # 使用文档中的示例数据
        sms_service.username = "test"
        sms_service.password = "123"
        timestamp = 1596254400000
        
        sign = sms_service._generate_sign(timestamp)
        expected_sign = "e315cf297826abdeb2092cc57f29f0bf"
        
        assert sign == expected_sign
 
    def test_email_service_disabled(self):
        """测试邮件服务禁用状态"""
        with patch('config.settings.email_enabled', False):
            result = email_service.send_notification("测试主题", "测试内容")
            assert result is True  # 禁用时应该返回True
 
    def test_sms_service_disabled(self):
        """测试短信服务禁用状态"""
        with patch('config.settings.sms_enabled', False):
            result = sms_service.send_notification("测试内容")
            assert result is True  # 禁用时应该返回True
 
    def test_online_status_worker_disabled(self):
        """测试在线状态检测工作进程禁用状态"""
        with patch('config.settings.online_status_enabled', False):
            # 创建新的工作进程实例进行测试
            from app.workers.online_status_worker import OnlineStatusWorker
            test_worker = OnlineStatusWorker()
            
            test_worker.start()
            assert test_worker.running is False
            assert test_worker.worker_thread is None
 
    def test_online_status_worker_status(self):
        """测试获取工作进程状态"""
        status = online_status_worker.get_status()
 
        assert "running" in status
        assert "enabled" in status
        assert "check_interval_minutes" in status
        assert "last_notification_time" in status
        assert "notification_cooldown_minutes" in status
 
        assert isinstance(status["running"], bool)
        assert isinstance(status["enabled"], bool)
        assert isinstance(status["check_interval_minutes"], (int, float))
        assert isinstance(status["notification_cooldown_minutes"], (int, float))
 
    def test_dynamic_w_id_update(self):
        """测试动态w_id更新功能"""
        from config import settings
 
        # 保存原始w_id
        original_w_id = settings.get_current_w_id()
 
        try:
            # 测试更新w_id
            new_w_id = "test-new-w-id-123"
            success = settings.update_ecloud_w_id(new_w_id)
 
            # 验证更新成功
            assert success is True
            assert settings.get_current_w_id() == new_w_id
 
            # 测试相同w_id不会重复更新
            success = settings.update_ecloud_w_id(new_w_id)
            assert success is True
 
        finally:
            # 恢复原始w_id
            settings.update_ecloud_w_id(original_w_id)
 
    def test_w_id_update_in_online_status_check(self):
        """测试在线状态检测中的w_id更新"""
        from app.workers.online_status_worker import OnlineStatusWorker
 
        # 创建测试工作进程实例
        test_worker = OnlineStatusWorker()
 
        # 模拟w_id更新
        test_w_id = "test-w-id-456"
        test_worker._update_w_id_if_needed(test_w_id)
 
        # 这里主要测试方法不会抛出异常
        # 实际的w_id更新逻辑在settings中已经测试过了
 
    def test_startup_contact_sync_logic(self):
        """测试启动时联系人同步逻辑"""
        # 测试没有在线微信时的处理逻辑
        with patch.object(ecloud_client, 'query_online_wechat_list') as mock_query:
            # 模拟查询失败
            mock_query.return_value = None
            # 这种情况下应该跳过联系人同步,不会抛出异常
 
            # 模拟没有在线微信
            mock_query.return_value = []
            # 这种情况下也应该跳过联系人同步
 
            # 模拟有在线微信
            mock_query.return_value = [{"wcId": "test_wc_id", "wId": "test_w_id"}]
            # 这种情况下会尝试同步联系人
 
    def test_email_service_connection_handling(self):
        """测试邮件服务连接处理"""
        # 测试邮件服务的连接和关闭逻辑
        # 主要确保不会因为连接关闭异常而影响发送结果
 
        # 这里主要测试方法结构,实际的SMTP测试需要真实的邮件服务器
        assert hasattr(email_service, 'send_email')
        assert hasattr(email_service, 'test_connection')
 
 
if __name__ == "__main__":
    pytest.main([__file__])