yj
2025-07-28 99266ea57913663f9880c512726c42cb7e5e7f28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
测试Dify流式模式功能
"""
 
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
from app.services.dify_client import DifyClient
from config import settings
 
 
class TestDifyStreaming:
    """测试Dify流式模式"""
 
    def setup_method(self):
        """测试前设置"""
        self.client = DifyClient()
 
    def test_process_stream_response_success(self):
        """测试成功处理流式响应"""
        # 模拟流式响应数据
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\", \"metadata\": {\"usage\": {\"total_tokens\": 10}}, \"usage\": {\"total_tokens\": 10}}",
        ]
 
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
 
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
 
        # 验证结果
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
        assert result["task_id"] == "test-task"
        assert result["usage"]["total_tokens"] == 10
 
    def test_process_stream_response_error(self):
        """测试处理流式响应错误"""
        # 模拟错误响应数据
        stream_data = [
            "data: {\"event\": \"error\", \"message\": \"API调用失败\", \"code\": \"500\"}",
        ]
 
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
 
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
 
        # 验证结果
        assert result is None
 
    def test_process_stream_response_incomplete(self):
        """测试处理不完整的流式响应"""
        # 模拟不完整响应数据(缺少message_end事件)
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
        ]
 
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
 
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
 
        # 验证结果 - 即使没有message_end事件,只要有内容和conversation_id也应该返回结果
        assert result is not None
        assert result["answer"] == "Hello"
        assert result["conversation_id"] == "test-conv"
 
    @patch('app.services.dify_client.settings')
    def test_send_message_uses_streaming_when_enabled(self, mock_settings):
        """测试当启用流式模式时使用流式发送"""
        # 设置配置为启用流式模式
        mock_settings.dify_streaming_enabled = True
        
        # 模拟流式发送方法
        with patch.object(self.client, 'send_chat_message_stream') as mock_stream:
            mock_stream.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            
            result = self.client.send_message("test query", "test_user")
            
            # 验证调用了流式方法
            mock_stream.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
 
    @patch('app.services.dify_client.settings')
    def test_send_message_uses_blocking_when_disabled(self, mock_settings):
        """测试当禁用流式模式时使用阻塞发送"""
        # 设置配置为禁用流式模式
        mock_settings.dify_streaming_enabled = False
        
        # 模拟阻塞发送方法
        with patch.object(self.client, 'send_chat_message') as mock_blocking:
            mock_blocking.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            
            result = self.client.send_message("test query", "test_user")
            
            # 验证调用了阻塞方法
            mock_blocking.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
 
    @patch('app.services.dify_client.settings')
    def test_send_message_force_streaming_override(self, mock_settings):
        """测试强制流式模式覆盖配置"""
        # 设置配置为禁用流式模式
        mock_settings.dify_streaming_enabled = False
        
        # 模拟流式发送方法
        with patch.object(self.client, 'send_chat_message_stream') as mock_stream:
            mock_stream.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            
            # 强制使用流式模式
            result = self.client.send_message("test query", "test_user", force_streaming=True)
            
            # 验证调用了流式方法(覆盖了配置)
            mock_stream.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
 
    def test_process_stream_response_with_ping_events(self):
        """测试处理包含ping事件的流式响应"""
        # 模拟包含ping事件的响应数据
        stream_data = [
            "data: {\"event\": \"ping\"}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: {\"event\": \"ping\"}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\", \"metadata\": {\"usage\": {\"total_tokens\": 10}}}",
        ]
 
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
 
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
 
        # 验证结果(ping事件应该被忽略)
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
 
    def test_process_stream_response_with_invalid_json(self):
        """测试处理包含无效JSON的流式响应"""
        # 模拟包含无效JSON的响应数据
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: invalid json data",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\"}",
        ]
 
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
 
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
 
        # 验证结果(无效JSON应该被跳过)
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
 
 
if __name__ == "__main__":
    pytest.main([__file__])