"""
|
DifyAI API客户端
|
"""
|
|
import requests
|
import time
|
import json
|
from typing import List, Dict, Optional, Any
|
from loguru import logger
|
from config import settings
|
|
|
class DifyClient:
|
"""DifyAI API客户端"""
|
|
def __init__(self):
|
self.base_url = settings.dify_base_url.rstrip("/")
|
self.headers = {
|
"Content-Type": "application/json",
|
"Authorization": f"Bearer {settings.dify_api_key}",
|
}
|
self.session = requests.Session()
|
self.session.headers.update(self.headers)
|
|
def send_chat_message(
|
self,
|
query: str,
|
user: str,
|
conversation_id: Optional[str] = None,
|
max_retries: int = None,
|
nick_name: Optional[str] = None,
|
) -> Optional[Dict[str, Any]]:
|
"""
|
发送对话消息(非流式模式)
|
|
Args:
|
query: 用户输入/提问内容
|
user: 用户标识
|
conversation_id: 会话ID(可选)
|
max_retries: 最大重试次数
|
nick_name: 昵称(可选,用于传递给dify)
|
|
Returns:
|
响应数据字典,失败返回None
|
"""
|
if max_retries is None:
|
max_retries = settings.max_retry_count
|
|
url = f"{self.base_url}/chat-messages"
|
|
# 构建inputs参数
|
inputs = {}
|
if nick_name:
|
inputs["nick_name"] = nick_name
|
|
payload = {
|
"query": query,
|
"response_mode": "blocking", # 使用阻塞模式(非流式)
|
"user": user,
|
"inputs": inputs,
|
}
|
|
# 如果有会话ID,添加到请求中
|
if conversation_id:
|
payload["conversation_id"] = conversation_id
|
|
retry_count = 0
|
while retry_count <= max_retries:
|
try:
|
logger.info(
|
f"发送Dify消息: user={user}, conversation_id={conversation_id}, retry={retry_count}"
|
)
|
|
response = self.session.post(url, json=payload, timeout=60)
|
response.raise_for_status()
|
|
result = response.json()
|
|
# 检查响应是否成功
|
if "answer" in result and "conversation_id" in result:
|
logger.info(
|
f"Dify消息发送成功: user={user}, conversation_id={result.get('conversation_id')}"
|
)
|
return result
|
else:
|
logger.error(f"Dify响应格式异常: user={user}, response={result}")
|
|
except requests.exceptions.Timeout:
|
logger.warning(f"Dify请求超时: user={user}, retry={retry_count}")
|
except requests.exceptions.RequestException as e:
|
logger.error(
|
f"Dify网络错误: user={user}, retry={retry_count}, error={str(e)}"
|
)
|
except Exception as e:
|
logger.error(
|
f"Dify请求异常: user={user}, retry={retry_count}, error={str(e)}"
|
)
|
|
retry_count += 1
|
if retry_count <= max_retries:
|
wait_time = settings.retry_delay * retry_count
|
logger.info(f"等待重试: user={user}, wait_time={wait_time}s")
|
time.sleep(wait_time)
|
|
logger.error(
|
f"Dify消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}"
|
)
|
return None
|
|
def send_chat_message_stream(
|
self,
|
query: str,
|
user: str,
|
conversation_id: Optional[str] = None,
|
max_retries: int = None,
|
nick_name: Optional[str] = None,
|
) -> Optional[Dict[str, Any]]:
|
"""
|
发送对话消息(流式模式)
|
|
Args:
|
query: 用户输入/提问内容
|
user: 用户标识
|
conversation_id: 会话ID(可选)
|
max_retries: 最大重试次数
|
nick_name: 昵称(可选,用于传递给dify)
|
|
Returns:
|
完整的响应数据字典,失败返回None
|
"""
|
if max_retries is None:
|
max_retries = settings.max_retry_count
|
|
url = f"{self.base_url}/chat-messages"
|
|
# 构建inputs参数
|
inputs = {}
|
if nick_name:
|
inputs["nick_name"] = nick_name
|
|
payload = {
|
"query": query,
|
"response_mode": "streaming", # 使用流式模式
|
"user": user,
|
"inputs": inputs,
|
}
|
|
# 如果有会话ID,添加到请求中
|
if conversation_id:
|
payload["conversation_id"] = conversation_id
|
|
retry_count = 0
|
while retry_count <= max_retries:
|
try:
|
logger.info(
|
f"发送Dify流式消息: user={user}, conversation_id={conversation_id}, retry={retry_count}"
|
)
|
|
response = self.session.post(
|
url,
|
json=payload,
|
timeout=settings.dify_streaming_timeout,
|
stream=True
|
)
|
response.raise_for_status()
|
|
# 处理流式响应
|
result = self._process_stream_response(response, user)
|
|
if result:
|
logger.info(
|
f"Dify流式消息发送成功: user={user}, conversation_id={result.get('conversation_id')}"
|
)
|
return result
|
else:
|
logger.error(f"Dify流式响应处理失败: user={user}")
|
|
except requests.exceptions.Timeout:
|
logger.warning(f"Dify流式请求超时: user={user}, retry={retry_count}")
|
except requests.exceptions.RequestException as e:
|
logger.error(
|
f"Dify流式网络错误: user={user}, retry={retry_count}, error={str(e)}"
|
)
|
except Exception as e:
|
logger.error(
|
f"Dify流式请求异常: user={user}, retry={retry_count}, error={str(e)}"
|
)
|
|
retry_count += 1
|
if retry_count <= max_retries:
|
wait_time = settings.retry_delay * retry_count
|
logger.info(f"等待重试: user={user}, wait_time={wait_time}s")
|
time.sleep(wait_time)
|
|
logger.error(
|
f"Dify流式消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}"
|
)
|
return None
|
|
def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]:
|
"""
|
处理流式响应
|
|
Args:
|
response: requests响应对象
|
user: 用户标识
|
|
Returns:
|
完整的响应数据字典,失败返回None
|
"""
|
try:
|
# 检查响应头
|
content_type = response.headers.get('content-type', '')
|
if 'text/event-stream' not in content_type:
|
logger.warning(f"响应不是SSE格式: user={user}, content_type={content_type}")
|
|
complete_answer = ""
|
conversation_id = ""
|
task_id = ""
|
message_id = ""
|
created_at = None
|
metadata = None
|
usage = None
|
retriever_resources = None
|
message_ended = False # 标记是否收到message_end事件
|
|
logger.info(f"开始处理流式响应: user={user}")
|
|
# 添加超时和行计数器
|
line_count = 0
|
max_empty_lines = 50 # 最大连续空行数,防止无限循环
|
|
for line in response.iter_lines(decode_unicode=True):
|
line_count += 1
|
|
if not line:
|
# 空行计数,防止无限等待
|
if line_count > max_empty_lines and not complete_answer:
|
logger.warning(f"流式响应过多空行,可能连接异常: user={user}, line_count={line_count}")
|
break
|
continue
|
|
# 跳过非数据行
|
if not line.startswith("data: "):
|
continue
|
|
# 提取JSON数据
|
data_str = line[6:] # 移除 "data: " 前缀
|
|
if not data_str.strip():
|
continue
|
|
try:
|
data = json.loads(data_str)
|
event = data.get("event", "")
|
|
if event == "message":
|
# 消息事件 - 累积答案内容
|
answer_chunk = data.get("answer", "")
|
complete_answer += answer_chunk
|
|
# 保存基本信息
|
if not conversation_id:
|
conversation_id = data.get("conversation_id", "")
|
if not task_id:
|
task_id = data.get("task_id", "")
|
if not message_id:
|
message_id = data.get("id", "")
|
if created_at is None:
|
created_at = data.get("created_at")
|
|
logger.debug(f"收到消息块: user={user}, chunk_length={len(answer_chunk)}")
|
|
elif event == "message_end":
|
# 消息结束事件 - 获取元数据
|
metadata = data.get("metadata")
|
usage = data.get("usage")
|
retriever_resources = data.get("retriever_resources")
|
message_ended = True
|
|
logger.info(f"流式响应完成: user={user}, total_length={len(complete_answer)}")
|
break
|
|
elif event == "message_file":
|
# 文件事件 - 记录但继续处理
|
logger.debug(f"收到文件事件: user={user}, file_type={data.get('type')}")
|
continue
|
|
elif event == "message_replace":
|
# 消息替换事件 - 替换答案内容
|
replace_answer = data.get("answer", "")
|
if replace_answer:
|
complete_answer = replace_answer
|
logger.info(f"消息内容被替换: user={user}, new_length={len(complete_answer)}")
|
continue
|
|
elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]:
|
# 工作流相关事件 - 记录但继续处理
|
logger.debug(f"收到工作流事件: user={user}, event={event}")
|
continue
|
|
elif event in ["tts_message", "tts_message_end"]:
|
# TTS音频事件 - 记录但继续处理
|
logger.debug(f"收到TTS事件: user={user}, event={event}")
|
continue
|
|
elif event in ["agent_thought", "agent_message"]:
|
# Agent相关事件 - 需要特殊处理
|
logger.debug(f"收到Agent事件: user={user}, event={event}, data_keys={list(data.keys())}")
|
|
# 从agent事件中提取conversation_id(如果有的话)
|
if not conversation_id and data.get("conversation_id"):
|
conversation_id = data.get("conversation_id")
|
logger.info(f"从Agent事件获取conversation_id: user={user}, conversation_id={conversation_id}")
|
|
# 从agent事件中提取基本信息(如果有的话)
|
if not task_id and data.get("task_id"):
|
task_id = data.get("task_id")
|
logger.debug(f"从Agent事件获取task_id: user={user}, task_id={task_id}")
|
if not message_id and data.get("id"):
|
message_id = data.get("id")
|
logger.debug(f"从Agent事件获取message_id: user={user}, message_id={message_id}")
|
if created_at is None and data.get("created_at"):
|
created_at = data.get("created_at")
|
logger.debug(f"从Agent事件获取created_at: user={user}, created_at={created_at}")
|
|
# 检查agent_message是否包含answer内容
|
if event == "agent_message" and data.get("answer"):
|
agent_answer = data.get("answer", "")
|
complete_answer += agent_answer
|
logger.debug(f"从Agent消息获取内容: user={user}, chunk_length={len(agent_answer)}")
|
|
continue
|
|
elif event == "error":
|
# 错误事件
|
error_msg = data.get("message", "未知错误")
|
logger.error(f"流式响应错误: user={user}, error={error_msg}")
|
return None
|
|
elif event == "ping":
|
# ping事件 - 保持连接
|
logger.debug(f"收到ping事件: user={user}")
|
continue
|
|
else:
|
# 未知事件类型 - 记录但继续处理
|
logger.debug(f"收到未知事件: user={user}, event={event}")
|
continue
|
|
except json.JSONDecodeError as e:
|
logger.warning(f"解析流式数据JSON失败: user={user}, data={data_str}, error={str(e)}")
|
continue
|
|
# 构建完整响应
|
# 对于Agent模式,可能没有conversation_id,但有task_id
|
# 在这种情况下,我们可以使用task_id作为conversation_id的替代
|
if conversation_id or (task_id and (complete_answer or message_ended)):
|
# 如果没有conversation_id但有task_id,使用task_id
|
final_conversation_id = conversation_id or task_id
|
|
result = {
|
"event": "message",
|
"task_id": task_id,
|
"id": message_id,
|
"message_id": message_id,
|
"conversation_id": final_conversation_id,
|
"mode": "chat",
|
"answer": complete_answer, # 可能为空字符串
|
"created_at": created_at
|
}
|
|
# 添加可选字段
|
if metadata:
|
result["metadata"] = metadata
|
if usage:
|
result["usage"] = usage
|
if retriever_resources:
|
result["retriever_resources"] = retriever_resources
|
|
if complete_answer:
|
logger.info(f"流式响应处理成功: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}")
|
else:
|
logger.info(f"流式响应处理成功(无内容): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}")
|
return result
|
else:
|
logger.error(f"流式响应不完整: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}")
|
# 记录更多调试信息
|
logger.debug(f"调试信息: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}")
|
return None
|
|
except Exception as e:
|
logger.error(f"处理流式响应异常: user={user}, error={str(e)}")
|
return None
|
|
def send_message(
|
self,
|
query: str,
|
user: str,
|
conversation_id: Optional[str] = None,
|
max_retries: int = None,
|
force_streaming: Optional[bool] = None,
|
nick_name: Optional[str] = None,
|
) -> Optional[Dict[str, Any]]:
|
"""
|
发送对话消息(根据配置选择模式)
|
|
Args:
|
query: 用户输入/提问内容
|
user: 用户标识
|
conversation_id: 会话ID(可选)
|
max_retries: 最大重试次数
|
force_streaming: 强制使用流式模式(可选,覆盖配置)
|
nick_name: 昵称(可选,用于传递给dify)
|
|
Returns:
|
响应数据字典,失败返回None
|
"""
|
# 确定使用哪种模式
|
use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled
|
|
if use_streaming:
|
logger.info(f"使用流式模式发送消息: user={user}")
|
return self.send_chat_message_stream(query, user, conversation_id, max_retries, nick_name)
|
else:
|
logger.info(f"使用阻塞模式发送消息: user={user}")
|
return self.send_chat_message(query, user, conversation_id, max_retries, nick_name)
|
|
def get_conversation_messages(
|
self, conversation_id: str, user: str
|
) -> Optional[List[Dict[str, Any]]]:
|
"""
|
获取对话历史消息
|
|
Args:
|
conversation_id: 会话ID
|
user: 用户标识
|
|
Returns:
|
消息列表,失败返回None
|
"""
|
try:
|
url = f"{self.base_url}/messages"
|
params = {"conversation_id": conversation_id, "user": user}
|
|
logger.info(f"获取对话历史: conversation_id={conversation_id}, user={user}")
|
|
response = self.session.get(url, params=params, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if "data" in result:
|
logger.info(f"成功获取对话历史: conversation_id={conversation_id}")
|
return result["data"]
|
else:
|
logger.error(
|
f"获取对话历史响应格式异常: conversation_id={conversation_id}, response={result}"
|
)
|
return None
|
|
except requests.exceptions.RequestException as e:
|
logger.error(
|
f"获取对话历史网络错误: conversation_id={conversation_id}, error={str(e)}"
|
)
|
return None
|
except Exception as e:
|
logger.error(
|
f"获取对话历史异常: conversation_id={conversation_id}, error={str(e)}"
|
)
|
return None
|
|
|
# 全局DifyAI客户端实例
|
dify_client = DifyClient()
|