| | |
| | | |
| | | import requests |
| | | import time |
| | | import json |
| | | from typing import List, Dict, Optional, Any |
| | | from loguru import logger |
| | | from config import settings |
| | |
| | | ) |
| | | return None |
| | | |
| | | def send_chat_message_stream( |
| | | self, |
| | | query: str, |
| | | user: str, |
| | | conversation_id: Optional[str] = None, |
| | | max_retries: int = None, |
| | | ) -> Optional[Dict[str, Any]]: |
| | | """ |
| | | 发送对话消息(流式模式) |
| | | |
| | | Args: |
| | | query: 用户输入/提问内容 |
| | | user: 用户标识 |
| | | conversation_id: 会话ID(可选) |
| | | max_retries: 最大重试次数 |
| | | |
| | | Returns: |
| | | 完整的响应数据字典,失败返回None |
| | | """ |
| | | if max_retries is None: |
| | | max_retries = settings.max_retry_count |
| | | |
| | | url = f"{self.base_url}/chat-messages" |
| | | payload = { |
| | | "query": query, |
| | | "response_mode": "streaming", # 使用流式模式 |
| | | "user": user, |
| | | "inputs": {}, |
| | | } |
| | | |
| | | # 如果有会话ID,添加到请求中 |
| | | if conversation_id: |
| | | payload["conversation_id"] = conversation_id |
| | | |
| | | retry_count = 0 |
| | | while retry_count <= max_retries: |
| | | try: |
| | | logger.info( |
| | | f"发送Dify流式消息: user={user}, conversation_id={conversation_id}, retry={retry_count}" |
| | | ) |
| | | |
| | | response = self.session.post( |
| | | url, |
| | | json=payload, |
| | | timeout=settings.dify_streaming_timeout, |
| | | stream=True |
| | | ) |
| | | response.raise_for_status() |
| | | |
| | | # 处理流式响应 |
| | | result = self._process_stream_response(response, user) |
| | | |
| | | if result: |
| | | logger.info( |
| | | f"Dify流式消息发送成功: user={user}, conversation_id={result.get('conversation_id')}" |
| | | ) |
| | | return result |
| | | else: |
| | | logger.error(f"Dify流式响应处理失败: user={user}") |
| | | |
| | | except requests.exceptions.Timeout: |
| | | logger.warning(f"Dify流式请求超时: user={user}, retry={retry_count}") |
| | | except requests.exceptions.RequestException as e: |
| | | logger.error( |
| | | f"Dify流式网络错误: user={user}, retry={retry_count}, error={str(e)}" |
| | | ) |
| | | except Exception as e: |
| | | logger.error( |
| | | f"Dify流式请求异常: user={user}, retry={retry_count}, error={str(e)}" |
| | | ) |
| | | |
| | | retry_count += 1 |
| | | if retry_count <= max_retries: |
| | | wait_time = settings.retry_delay * retry_count |
| | | logger.info(f"等待重试: user={user}, wait_time={wait_time}s") |
| | | time.sleep(wait_time) |
| | | |
| | | logger.error( |
| | | f"Dify流式消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}" |
| | | ) |
| | | return None |
| | | |
| | | def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]: |
| | | """ |
| | | 处理流式响应 |
| | | |
| | | Args: |
| | | response: requests响应对象 |
| | | user: 用户标识 |
| | | |
| | | Returns: |
| | | 完整的响应数据字典,失败返回None |
| | | """ |
| | | try: |
| | | # 检查响应头 |
| | | content_type = response.headers.get('content-type', '') |
| | | if 'text/event-stream' not in content_type: |
| | | logger.warning(f"响应不是SSE格式: user={user}, content_type={content_type}") |
| | | |
| | | complete_answer = "" |
| | | conversation_id = "" |
| | | task_id = "" |
| | | message_id = "" |
| | | created_at = None |
| | | metadata = None |
| | | usage = None |
| | | retriever_resources = None |
| | | message_ended = False # 标记是否收到message_end事件 |
| | | |
| | | logger.info(f"开始处理流式响应: user={user}") |
| | | |
| | | # 添加超时和行计数器 |
| | | line_count = 0 |
| | | max_empty_lines = 50 # 最大连续空行数,防止无限循环 |
| | | |
| | | for line in response.iter_lines(decode_unicode=True): |
| | | line_count += 1 |
| | | |
| | | if not line: |
| | | # 空行计数,防止无限等待 |
| | | if line_count > max_empty_lines and not complete_answer: |
| | | logger.warning(f"流式响应过多空行,可能连接异常: user={user}, line_count={line_count}") |
| | | break |
| | | continue |
| | | |
| | | # 跳过非数据行 |
| | | if not line.startswith("data: "): |
| | | continue |
| | | |
| | | # 提取JSON数据 |
| | | data_str = line[6:] # 移除 "data: " 前缀 |
| | | |
| | | if not data_str.strip(): |
| | | continue |
| | | |
| | | try: |
| | | data = json.loads(data_str) |
| | | event = data.get("event", "") |
| | | |
| | | if event == "message": |
| | | # 消息事件 - 累积答案内容 |
| | | answer_chunk = data.get("answer", "") |
| | | complete_answer += answer_chunk |
| | | |
| | | # 保存基本信息 |
| | | if not conversation_id: |
| | | conversation_id = data.get("conversation_id", "") |
| | | if not task_id: |
| | | task_id = data.get("task_id", "") |
| | | if not message_id: |
| | | message_id = data.get("id", "") |
| | | if created_at is None: |
| | | created_at = data.get("created_at") |
| | | |
| | | logger.debug(f"收到消息块: user={user}, chunk_length={len(answer_chunk)}") |
| | | |
| | | elif event == "message_end": |
| | | # 消息结束事件 - 获取元数据 |
| | | metadata = data.get("metadata") |
| | | usage = data.get("usage") |
| | | retriever_resources = data.get("retriever_resources") |
| | | message_ended = True |
| | | |
| | | logger.info(f"流式响应完成: user={user}, total_length={len(complete_answer)}") |
| | | break |
| | | |
| | | elif event == "message_file": |
| | | # 文件事件 - 记录但继续处理 |
| | | logger.debug(f"收到文件事件: user={user}, file_type={data.get('type')}") |
| | | continue |
| | | |
| | | elif event == "message_replace": |
| | | # 消息替换事件 - 替换答案内容 |
| | | replace_answer = data.get("answer", "") |
| | | if replace_answer: |
| | | complete_answer = replace_answer |
| | | logger.info(f"消息内容被替换: user={user}, new_length={len(complete_answer)}") |
| | | continue |
| | | |
| | | elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]: |
| | | # 工作流相关事件 - 记录但继续处理 |
| | | logger.debug(f"收到工作流事件: user={user}, event={event}") |
| | | continue |
| | | |
| | | elif event in ["tts_message", "tts_message_end"]: |
| | | # TTS音频事件 - 记录但继续处理 |
| | | logger.debug(f"收到TTS事件: user={user}, event={event}") |
| | | continue |
| | | |
| | | elif event in ["agent_thought", "agent_message"]: |
| | | # Agent相关事件 - 需要特殊处理 |
| | | logger.debug(f"收到Agent事件: user={user}, event={event}, data_keys={list(data.keys())}") |
| | | |
| | | # 从agent事件中提取conversation_id(如果有的话) |
| | | if not conversation_id and data.get("conversation_id"): |
| | | conversation_id = data.get("conversation_id") |
| | | logger.info(f"从Agent事件获取conversation_id: user={user}, conversation_id={conversation_id}") |
| | | |
| | | # 从agent事件中提取基本信息(如果有的话) |
| | | if not task_id and data.get("task_id"): |
| | | task_id = data.get("task_id") |
| | | logger.debug(f"从Agent事件获取task_id: user={user}, task_id={task_id}") |
| | | if not message_id and data.get("id"): |
| | | message_id = data.get("id") |
| | | logger.debug(f"从Agent事件获取message_id: user={user}, message_id={message_id}") |
| | | if created_at is None and data.get("created_at"): |
| | | created_at = data.get("created_at") |
| | | logger.debug(f"从Agent事件获取created_at: user={user}, created_at={created_at}") |
| | | |
| | | # 检查agent_message是否包含answer内容 |
| | | if event == "agent_message" and data.get("answer"): |
| | | agent_answer = data.get("answer", "") |
| | | complete_answer += agent_answer |
| | | logger.debug(f"从Agent消息获取内容: user={user}, chunk_length={len(agent_answer)}") |
| | | |
| | | continue |
| | | |
| | | elif event == "error": |
| | | # 错误事件 |
| | | error_msg = data.get("message", "未知错误") |
| | | logger.error(f"流式响应错误: user={user}, error={error_msg}") |
| | | return None |
| | | |
| | | elif event == "ping": |
| | | # ping事件 - 保持连接 |
| | | logger.debug(f"收到ping事件: user={user}") |
| | | continue |
| | | |
| | | else: |
| | | # 未知事件类型 - 记录但继续处理 |
| | | logger.debug(f"收到未知事件: user={user}, event={event}") |
| | | continue |
| | | |
| | | except json.JSONDecodeError as e: |
| | | logger.warning(f"解析流式数据JSON失败: user={user}, data={data_str}, error={str(e)}") |
| | | continue |
| | | |
| | | # 构建完整响应 |
| | | # 对于Agent模式,可能没有conversation_id,但有task_id |
| | | # 在这种情况下,我们可以使用task_id作为conversation_id的替代 |
| | | if conversation_id or (task_id and (complete_answer or message_ended)): |
| | | # 如果没有conversation_id但有task_id,使用task_id |
| | | final_conversation_id = conversation_id or task_id |
| | | |
| | | result = { |
| | | "event": "message", |
| | | "task_id": task_id, |
| | | "id": message_id, |
| | | "message_id": message_id, |
| | | "conversation_id": final_conversation_id, |
| | | "mode": "chat", |
| | | "answer": complete_answer, # 可能为空字符串 |
| | | "created_at": created_at |
| | | } |
| | | |
| | | # 添加可选字段 |
| | | if metadata: |
| | | result["metadata"] = metadata |
| | | if usage: |
| | | result["usage"] = usage |
| | | if retriever_resources: |
| | | result["retriever_resources"] = retriever_resources |
| | | |
| | | if complete_answer: |
| | | logger.info(f"流式响应处理成功: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}") |
| | | else: |
| | | logger.info(f"流式响应处理成功(无内容): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}") |
| | | return result |
| | | else: |
| | | logger.error(f"流式响应不完整: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}") |
| | | # 记录更多调试信息 |
| | | logger.debug(f"调试信息: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}") |
| | | return None |
| | | |
| | | except Exception as e: |
| | | logger.error(f"处理流式响应异常: user={user}, error={str(e)}") |
| | | return None |
| | | |
| | | def send_message( |
| | | self, |
| | | query: str, |
| | | user: str, |
| | | conversation_id: Optional[str] = None, |
| | | max_retries: int = None, |
| | | force_streaming: Optional[bool] = None, |
| | | ) -> Optional[Dict[str, Any]]: |
| | | """ |
| | | 发送对话消息(根据配置选择模式) |
| | | |
| | | Args: |
| | | query: 用户输入/提问内容 |
| | | user: 用户标识 |
| | | conversation_id: 会话ID(可选) |
| | | max_retries: 最大重试次数 |
| | | force_streaming: 强制使用流式模式(可选,覆盖配置) |
| | | |
| | | Returns: |
| | | 响应数据字典,失败返回None |
| | | """ |
| | | # 确定使用哪种模式 |
| | | use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled |
| | | |
| | | if use_streaming: |
| | | logger.info(f"使用流式模式发送消息: user={user}") |
| | | return self.send_chat_message_stream(query, user, conversation_id, max_retries) |
| | | else: |
| | | logger.info(f"使用阻塞模式发送消息: user={user}") |
| | | return self.send_chat_message(query, user, conversation_id, max_retries) |
| | | |
| | | def get_conversation_messages( |
| | | self, conversation_id: str, user: str |
| | | ) -> Optional[List[Dict[str, Any]]]: |