yj
2025-07-28 99266ea57913663f9880c512726c42cb7e5e7f28
app/services/dify_client.py
@@ -4,6 +4,7 @@
import requests
import time
import json
from typing import List, Dict, Optional, Any
from loguru import logger
from config import settings
@@ -98,6 +99,315 @@
        )
        return None
    def send_chat_message_stream(
        self,
        query: str,
        user: str,
        conversation_id: Optional[str] = None,
        max_retries: int = None,
    ) -> Optional[Dict[str, Any]]:
        """
        发送对话消息(流式模式)
        Args:
            query: 用户输入/提问内容
            user: 用户标识
            conversation_id: 会话ID(可选)
            max_retries: 最大重试次数
        Returns:
            完整的响应数据字典,失败返回None
        """
        if max_retries is None:
            max_retries = settings.max_retry_count
        url = f"{self.base_url}/chat-messages"
        payload = {
            "query": query,
            "response_mode": "streaming",  # 使用流式模式
            "user": user,
            "inputs": {},
        }
        # 如果有会话ID,添加到请求中
        if conversation_id:
            payload["conversation_id"] = conversation_id
        retry_count = 0
        while retry_count <= max_retries:
            try:
                logger.info(
                    f"发送Dify流式消息: user={user}, conversation_id={conversation_id}, retry={retry_count}"
                )
                response = self.session.post(
                    url,
                    json=payload,
                    timeout=settings.dify_streaming_timeout,
                    stream=True
                )
                response.raise_for_status()
                # 处理流式响应
                result = self._process_stream_response(response, user)
                if result:
                    logger.info(
                        f"Dify流式消息发送成功: user={user}, conversation_id={result.get('conversation_id')}"
                    )
                    return result
                else:
                    logger.error(f"Dify流式响应处理失败: user={user}")
            except requests.exceptions.Timeout:
                logger.warning(f"Dify流式请求超时: user={user}, retry={retry_count}")
            except requests.exceptions.RequestException as e:
                logger.error(
                    f"Dify流式网络错误: user={user}, retry={retry_count}, error={str(e)}"
                )
            except Exception as e:
                logger.error(
                    f"Dify流式请求异常: user={user}, retry={retry_count}, error={str(e)}"
                )
            retry_count += 1
            if retry_count <= max_retries:
                wait_time = settings.retry_delay * retry_count
                logger.info(f"等待重试: user={user}, wait_time={wait_time}s")
                time.sleep(wait_time)
        logger.error(
            f"Dify流式消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}"
        )
        return None
    def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]:
        """
        处理流式响应
        Args:
            response: requests响应对象
            user: 用户标识
        Returns:
            完整的响应数据字典,失败返回None
        """
        try:
            # 检查响应头
            content_type = response.headers.get('content-type', '')
            if 'text/event-stream' not in content_type:
                logger.warning(f"响应不是SSE格式: user={user}, content_type={content_type}")
            complete_answer = ""
            conversation_id = ""
            task_id = ""
            message_id = ""
            created_at = None
            metadata = None
            usage = None
            retriever_resources = None
            message_ended = False  # 标记是否收到message_end事件
            logger.info(f"开始处理流式响应: user={user}")
            # 添加超时和行计数器
            line_count = 0
            max_empty_lines = 50  # 最大连续空行数,防止无限循环
            for line in response.iter_lines(decode_unicode=True):
                line_count += 1
                if not line:
                    # 空行计数,防止无限等待
                    if line_count > max_empty_lines and not complete_answer:
                        logger.warning(f"流式响应过多空行,可能连接异常: user={user}, line_count={line_count}")
                        break
                    continue
                # 跳过非数据行
                if not line.startswith("data: "):
                    continue
                # 提取JSON数据
                data_str = line[6:]  # 移除 "data: " 前缀
                if not data_str.strip():
                    continue
                try:
                    data = json.loads(data_str)
                    event = data.get("event", "")
                    if event == "message":
                        # 消息事件 - 累积答案内容
                        answer_chunk = data.get("answer", "")
                        complete_answer += answer_chunk
                        # 保存基本信息
                        if not conversation_id:
                            conversation_id = data.get("conversation_id", "")
                        if not task_id:
                            task_id = data.get("task_id", "")
                        if not message_id:
                            message_id = data.get("id", "")
                        if created_at is None:
                            created_at = data.get("created_at")
                        logger.debug(f"收到消息块: user={user}, chunk_length={len(answer_chunk)}")
                    elif event == "message_end":
                        # 消息结束事件 - 获取元数据
                        metadata = data.get("metadata")
                        usage = data.get("usage")
                        retriever_resources = data.get("retriever_resources")
                        message_ended = True
                        logger.info(f"流式响应完成: user={user}, total_length={len(complete_answer)}")
                        break
                    elif event == "message_file":
                        # 文件事件 - 记录但继续处理
                        logger.debug(f"收到文件事件: user={user}, file_type={data.get('type')}")
                        continue
                    elif event == "message_replace":
                        # 消息替换事件 - 替换答案内容
                        replace_answer = data.get("answer", "")
                        if replace_answer:
                            complete_answer = replace_answer
                            logger.info(f"消息内容被替换: user={user}, new_length={len(complete_answer)}")
                        continue
                    elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]:
                        # 工作流相关事件 - 记录但继续处理
                        logger.debug(f"收到工作流事件: user={user}, event={event}")
                        continue
                    elif event in ["tts_message", "tts_message_end"]:
                        # TTS音频事件 - 记录但继续处理
                        logger.debug(f"收到TTS事件: user={user}, event={event}")
                        continue
                    elif event in ["agent_thought", "agent_message"]:
                        # Agent相关事件 - 需要特殊处理
                        logger.debug(f"收到Agent事件: user={user}, event={event}, data_keys={list(data.keys())}")
                        # 从agent事件中提取conversation_id(如果有的话)
                        if not conversation_id and data.get("conversation_id"):
                            conversation_id = data.get("conversation_id")
                            logger.info(f"从Agent事件获取conversation_id: user={user}, conversation_id={conversation_id}")
                        # 从agent事件中提取基本信息(如果有的话)
                        if not task_id and data.get("task_id"):
                            task_id = data.get("task_id")
                            logger.debug(f"从Agent事件获取task_id: user={user}, task_id={task_id}")
                        if not message_id and data.get("id"):
                            message_id = data.get("id")
                            logger.debug(f"从Agent事件获取message_id: user={user}, message_id={message_id}")
                        if created_at is None and data.get("created_at"):
                            created_at = data.get("created_at")
                            logger.debug(f"从Agent事件获取created_at: user={user}, created_at={created_at}")
                        # 检查agent_message是否包含answer内容
                        if event == "agent_message" and data.get("answer"):
                            agent_answer = data.get("answer", "")
                            complete_answer += agent_answer
                            logger.debug(f"从Agent消息获取内容: user={user}, chunk_length={len(agent_answer)}")
                        continue
                    elif event == "error":
                        # 错误事件
                        error_msg = data.get("message", "未知错误")
                        logger.error(f"流式响应错误: user={user}, error={error_msg}")
                        return None
                    elif event == "ping":
                        # ping事件 - 保持连接
                        logger.debug(f"收到ping事件: user={user}")
                        continue
                    else:
                        # 未知事件类型 - 记录但继续处理
                        logger.debug(f"收到未知事件: user={user}, event={event}")
                        continue
                except json.JSONDecodeError as e:
                    logger.warning(f"解析流式数据JSON失败: user={user}, data={data_str}, error={str(e)}")
                    continue
            # 构建完整响应
            # 对于Agent模式,可能没有conversation_id,但有task_id
            # 在这种情况下,我们可以使用task_id作为conversation_id的替代
            if conversation_id or (task_id and (complete_answer or message_ended)):
                # 如果没有conversation_id但有task_id,使用task_id
                final_conversation_id = conversation_id or task_id
                result = {
                    "event": "message",
                    "task_id": task_id,
                    "id": message_id,
                    "message_id": message_id,
                    "conversation_id": final_conversation_id,
                    "mode": "chat",
                    "answer": complete_answer,  # 可能为空字符串
                    "created_at": created_at
                }
                # 添加可选字段
                if metadata:
                    result["metadata"] = metadata
                if usage:
                    result["usage"] = usage
                if retriever_resources:
                    result["retriever_resources"] = retriever_resources
                if complete_answer:
                    logger.info(f"流式响应处理成功: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}")
                else:
                    logger.info(f"流式响应处理成功(无内容): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}")
                return result
            else:
                logger.error(f"流式响应不完整: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}")
                # 记录更多调试信息
                logger.debug(f"调试信息: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}")
                return None
        except Exception as e:
            logger.error(f"处理流式响应异常: user={user}, error={str(e)}")
            return None
    def send_message(
        self,
        query: str,
        user: str,
        conversation_id: Optional[str] = None,
        max_retries: int = None,
        force_streaming: Optional[bool] = None,
    ) -> Optional[Dict[str, Any]]:
        """
        发送对话消息(根据配置选择模式)
        Args:
            query: 用户输入/提问内容
            user: 用户标识
            conversation_id: 会话ID(可选)
            max_retries: 最大重试次数
            force_streaming: 强制使用流式模式(可选,覆盖配置)
        Returns:
            响应数据字典,失败返回None
        """
        # 确定使用哪种模式
        use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled
        if use_streaming:
            logger.info(f"使用流式模式发送消息: user={user}")
            return self.send_chat_message_stream(query, user, conversation_id, max_retries)
        else:
            logger.info(f"使用阻塞模式发送消息: user={user}")
            return self.send_chat_message(query, user, conversation_id, max_retries)
    def get_conversation_messages(
        self, conversation_id: str, user: str
    ) -> Optional[List[Dict[str, Any]]]: