""" DifyAI API客户端 """ import requests import time from typing import List, Dict, Optional, Any from loguru import logger from config import settings class DifyClient: """DifyAI API客户端""" def __init__(self): self.base_url = settings.dify_base_url.rstrip("/") self.headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.dify_api_key}", } self.session = requests.Session() self.session.headers.update(self.headers) def send_chat_message( self, query: str, user: str, conversation_id: Optional[str] = None, max_retries: int = None, ) -> Optional[Dict[str, Any]]: """ 发送对话消息(非流式模式) Args: query: 用户输入/提问内容 user: 用户标识 conversation_id: 会话ID(可选) max_retries: 最大重试次数 Returns: 响应数据字典,失败返回None """ if max_retries is None: max_retries = settings.max_retry_count url = f"{self.base_url}/chat-messages" payload = { "query": query, "response_mode": "blocking", # 使用阻塞模式(非流式) "user": user, "inputs": {}, } # 如果有会话ID,添加到请求中 if conversation_id: payload["conversation_id"] = conversation_id retry_count = 0 while retry_count <= max_retries: try: logger.info( f"发送Dify消息: user={user}, conversation_id={conversation_id}, retry={retry_count}" ) response = self.session.post(url, json=payload, timeout=60) response.raise_for_status() result = response.json() # 检查响应是否成功 if "answer" in result and "conversation_id" in result: logger.info( f"Dify消息发送成功: user={user}, conversation_id={result.get('conversation_id')}" ) return result else: logger.error(f"Dify响应格式异常: user={user}, response={result}") except requests.exceptions.Timeout: logger.warning(f"Dify请求超时: user={user}, retry={retry_count}") except requests.exceptions.RequestException as e: logger.error( f"Dify网络错误: user={user}, retry={retry_count}, error={str(e)}" ) except Exception as e: logger.error( f"Dify请求异常: user={user}, retry={retry_count}, error={str(e)}" ) retry_count += 1 if retry_count <= max_retries: wait_time = settings.retry_delay * retry_count logger.info(f"等待重试: user={user}, wait_time={wait_time}s") time.sleep(wait_time) logger.error( f"Dify消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}" ) return None def get_conversation_messages( self, conversation_id: str, user: str ) -> Optional[List[Dict[str, Any]]]: """ 获取对话历史消息 Args: conversation_id: 会话ID user: 用户标识 Returns: 消息列表,失败返回None """ try: url = f"{self.base_url}/messages" params = {"conversation_id": conversation_id, "user": user} logger.info(f"获取对话历史: conversation_id={conversation_id}, user={user}") response = self.session.get(url, params=params, timeout=30) response.raise_for_status() result = response.json() if "data" in result: logger.info(f"成功获取对话历史: conversation_id={conversation_id}") return result["data"] else: logger.error( f"获取对话历史响应格式异常: conversation_id={conversation_id}, response={result}" ) return None except requests.exceptions.RequestException as e: logger.error( f"获取对话历史网络错误: conversation_id={conversation_id}, error={str(e)}" ) return None except Exception as e: logger.error( f"获取对话历史异常: conversation_id={conversation_id}, error={str(e)}" ) return None # 全局DifyAI客户端实例 dify_client = DifyClient()