""" DifyAI API客户端 """ import requests import time import json from typing import List, Dict, Optional, Any from loguru import logger from config import settings class DifyClient: """DifyAI API客户端""" def __init__(self): self.base_url = settings.dify_base_url.rstrip("/") self.headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.dify_api_key}", } self.session = requests.Session() self.session.headers.update(self.headers) def send_chat_message( self, query: str, user: str, conversation_id: Optional[str] = None, max_retries: int = None, nick_name: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ 发送对话消息(非流式模式) Args: query: 用户输入/提问内容 user: 用户标识 conversation_id: 会话ID(可选) max_retries: 最大重试次数 nick_name: 昵称(可选,用于传递给dify) Returns: 响应数据字典,失败返回None """ if max_retries is None: max_retries = settings.max_retry_count url = f"{self.base_url}/chat-messages" # 构建inputs参数 inputs = {} if nick_name: inputs["nick_name"] = nick_name payload = { "query": query, "response_mode": "blocking", # 使用阻塞模式(非流式) "user": user, "inputs": inputs, } # 如果有会话ID,添加到请求中 if conversation_id: payload["conversation_id"] = conversation_id retry_count = 0 while retry_count <= max_retries: try: logger.info( f"发送Dify消息: user={user}, conversation_id={conversation_id}, retry={retry_count}" ) response = self.session.post(url, json=payload, timeout=60) response.raise_for_status() result = response.json() # 检查响应是否成功 if "answer" in result and "conversation_id" in result: logger.info( f"Dify消息发送成功: user={user}, conversation_id={result.get('conversation_id')}" ) return result else: logger.error(f"Dify响应格式异常: user={user}, response={result}") except requests.exceptions.Timeout: logger.warning(f"Dify请求超时: user={user}, retry={retry_count}") except requests.exceptions.RequestException as e: logger.error( f"Dify网络错误: user={user}, retry={retry_count}, error={str(e)}" ) except Exception as e: logger.error( f"Dify请求异常: user={user}, retry={retry_count}, error={str(e)}" ) retry_count += 1 if retry_count <= max_retries: wait_time = settings.retry_delay * retry_count logger.info(f"等待重试: user={user}, wait_time={wait_time}s") time.sleep(wait_time) logger.error( f"Dify消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}" ) return None def send_chat_message_stream( self, query: str, user: str, conversation_id: Optional[str] = None, max_retries: int = None, nick_name: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ 发送对话消息(流式模式) Args: query: 用户输入/提问内容 user: 用户标识 conversation_id: 会话ID(可选) max_retries: 最大重试次数 nick_name: 昵称(可选,用于传递给dify) Returns: 完整的响应数据字典,失败返回None """ if max_retries is None: max_retries = settings.max_retry_count url = f"{self.base_url}/chat-messages" # 构建inputs参数 inputs = {} if nick_name: inputs["nick_name"] = nick_name payload = { "query": query, "response_mode": "streaming", # 使用流式模式 "user": user, "inputs": inputs, } # 如果有会话ID,添加到请求中 if conversation_id: payload["conversation_id"] = conversation_id retry_count = 0 while retry_count <= max_retries: try: logger.info( f"发送Dify流式消息: user={user}, conversation_id={conversation_id}, retry={retry_count}" ) response = self.session.post( url, json=payload, timeout=settings.dify_streaming_timeout, stream=True ) response.raise_for_status() # 处理流式响应 result = self._process_stream_response(response, user) if result: logger.info( f"Dify流式消息发送成功: user={user}, conversation_id={result.get('conversation_id')}" ) return result else: logger.error(f"Dify流式响应处理失败: user={user}") except requests.exceptions.Timeout: logger.warning(f"Dify流式请求超时: user={user}, retry={retry_count}") except requests.exceptions.RequestException as e: logger.error( f"Dify流式网络错误: user={user}, retry={retry_count}, error={str(e)}" ) except Exception as e: logger.error( f"Dify流式请求异常: user={user}, retry={retry_count}, error={str(e)}" ) retry_count += 1 if retry_count <= max_retries: wait_time = settings.retry_delay * retry_count logger.info(f"等待重试: user={user}, wait_time={wait_time}s") time.sleep(wait_time) logger.error( f"Dify流式消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}" ) return None def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]: """ 处理流式响应 Args: response: requests响应对象 user: 用户标识 Returns: 完整的响应数据字典,失败返回None """ try: # 检查响应头 content_type = response.headers.get('content-type', '') if 'text/event-stream' not in content_type: logger.warning(f"响应不是SSE格式: user={user}, content_type={content_type}") complete_answer = "" conversation_id = "" task_id = "" message_id = "" created_at = None metadata = None usage = None retriever_resources = None message_ended = False # 标记是否收到message_end事件 logger.info(f"开始处理流式响应: user={user}") # 添加超时和行计数器 line_count = 0 max_empty_lines = 50 # 最大连续空行数,防止无限循环 for line in response.iter_lines(decode_unicode=True): line_count += 1 if not line: # 空行计数,防止无限等待 if line_count > max_empty_lines and not complete_answer: logger.warning(f"流式响应过多空行,可能连接异常: user={user}, line_count={line_count}") break continue # 跳过非数据行 if not line.startswith("data: "): continue # 提取JSON数据 data_str = line[6:] # 移除 "data: " 前缀 if not data_str.strip(): continue try: data = json.loads(data_str) event = data.get("event", "") if event == "message": # 消息事件 - 累积答案内容 answer_chunk = data.get("answer", "") complete_answer += answer_chunk # 保存基本信息 if not conversation_id: conversation_id = data.get("conversation_id", "") if not task_id: task_id = data.get("task_id", "") if not message_id: message_id = data.get("id", "") if created_at is None: created_at = data.get("created_at") logger.debug(f"收到消息块: user={user}, chunk_length={len(answer_chunk)}") elif event == "message_end": # 消息结束事件 - 获取元数据 metadata = data.get("metadata") usage = data.get("usage") retriever_resources = data.get("retriever_resources") message_ended = True logger.info(f"流式响应完成: user={user}, total_length={len(complete_answer)}") break elif event == "message_file": # 文件事件 - 记录但继续处理 logger.debug(f"收到文件事件: user={user}, file_type={data.get('type')}") continue elif event == "message_replace": # 消息替换事件 - 替换答案内容 replace_answer = data.get("answer", "") if replace_answer: complete_answer = replace_answer logger.info(f"消息内容被替换: user={user}, new_length={len(complete_answer)}") continue elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]: # 工作流相关事件 - 记录但继续处理 logger.debug(f"收到工作流事件: user={user}, event={event}") continue elif event in ["tts_message", "tts_message_end"]: # TTS音频事件 - 记录但继续处理 logger.debug(f"收到TTS事件: user={user}, event={event}") continue elif event in ["agent_thought", "agent_message"]: # Agent相关事件 - 需要特殊处理 logger.debug(f"收到Agent事件: user={user}, event={event}, data_keys={list(data.keys())}") # 从agent事件中提取conversation_id(如果有的话) if not conversation_id and data.get("conversation_id"): conversation_id = data.get("conversation_id") logger.info(f"从Agent事件获取conversation_id: user={user}, conversation_id={conversation_id}") # 从agent事件中提取基本信息(如果有的话) if not task_id and data.get("task_id"): task_id = data.get("task_id") logger.debug(f"从Agent事件获取task_id: user={user}, task_id={task_id}") if not message_id and data.get("id"): message_id = data.get("id") logger.debug(f"从Agent事件获取message_id: user={user}, message_id={message_id}") if created_at is None and data.get("created_at"): created_at = data.get("created_at") logger.debug(f"从Agent事件获取created_at: user={user}, created_at={created_at}") # 检查agent_message是否包含answer内容 if event == "agent_message" and data.get("answer"): agent_answer = data.get("answer", "") complete_answer += agent_answer logger.debug(f"从Agent消息获取内容: user={user}, chunk_length={len(agent_answer)}") continue elif event == "error": # 错误事件 error_msg = data.get("message", "未知错误") logger.error(f"流式响应错误: user={user}, error={error_msg}") return None elif event == "ping": # ping事件 - 保持连接 logger.debug(f"收到ping事件: user={user}") continue else: # 未知事件类型 - 记录但继续处理 logger.debug(f"收到未知事件: user={user}, event={event}") continue except json.JSONDecodeError as e: logger.warning(f"解析流式数据JSON失败: user={user}, data={data_str}, error={str(e)}") continue # 构建完整响应 # 对于Agent模式,可能没有conversation_id,但有task_id # 在这种情况下,我们可以使用task_id作为conversation_id的替代 if conversation_id or (task_id and (complete_answer or message_ended)): # 如果没有conversation_id但有task_id,使用task_id final_conversation_id = conversation_id or task_id result = { "event": "message", "task_id": task_id, "id": message_id, "message_id": message_id, "conversation_id": final_conversation_id, "mode": "chat", "answer": complete_answer, # 可能为空字符串 "created_at": created_at } # 添加可选字段 if metadata: result["metadata"] = metadata if usage: result["usage"] = usage if retriever_resources: result["retriever_resources"] = retriever_resources if complete_answer: logger.info(f"流式响应处理成功: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}") else: logger.info(f"流式响应处理成功(无内容): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}") return result else: logger.error(f"流式响应不完整: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}") # 记录更多调试信息 logger.debug(f"调试信息: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}") return None except Exception as e: logger.error(f"处理流式响应异常: user={user}, error={str(e)}") return None def send_message( self, query: str, user: str, conversation_id: Optional[str] = None, max_retries: int = None, force_streaming: Optional[bool] = None, nick_name: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ 发送对话消息(根据配置选择模式) Args: query: 用户输入/提问内容 user: 用户标识 conversation_id: 会话ID(可选) max_retries: 最大重试次数 force_streaming: 强制使用流式模式(可选,覆盖配置) nick_name: 昵称(可选,用于传递给dify) Returns: 响应数据字典,失败返回None """ # 确定使用哪种模式 use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled if use_streaming: logger.info(f"使用流式模式发送消息: user={user}") return self.send_chat_message_stream(query, user, conversation_id, max_retries, nick_name) else: logger.info(f"使用阻塞模式发送消息: user={user}") return self.send_chat_message(query, user, conversation_id, max_retries, nick_name) def get_conversation_messages( self, conversation_id: str, user: str ) -> Optional[List[Dict[str, Any]]]: """ 获取对话历史消息 Args: conversation_id: 会话ID user: 用户标识 Returns: 消息列表,失败返回None """ try: url = f"{self.base_url}/messages" params = {"conversation_id": conversation_id, "user": user} logger.info(f"获取对话历史: conversation_id={conversation_id}, user={user}") response = self.session.get(url, params=params, timeout=30) response.raise_for_status() result = response.json() if "data" in result: logger.info(f"成功获取对话历史: conversation_id={conversation_id}") return result["data"] else: logger.error( f"获取对话历史响应格式异常: conversation_id={conversation_id}, response={result}" ) return None except requests.exceptions.RequestException as e: logger.error( f"获取对话历史网络错误: conversation_id={conversation_id}, error={str(e)}" ) return None except Exception as e: logger.error( f"获取对话历史异常: conversation_id={conversation_id}, error={str(e)}" ) return None # 全局DifyAI客户端实例 dify_client = DifyClient()