From 18c6d2b0dd77b38f487747aad1fcd1218aa8c356 Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期三, 27 八月 2025 09:19:57 +0800 Subject: [PATCH] 1. 新增测试群组关键字,测试群组名称包含关键字,在其中发言一律不触发静默。 --- app/services/dify_client.py | 330 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 329 insertions(+), 1 deletions(-) diff --git a/app/services/dify_client.py b/app/services/dify_client.py index f029c8e..0e6bbda 100644 --- a/app/services/dify_client.py +++ b/app/services/dify_client.py @@ -4,6 +4,7 @@ import requests import time +import json from typing import List, Dict, Optional, Any from loguru import logger from config import settings @@ -27,6 +28,7 @@ user: str, conversation_id: Optional[str] = None, max_retries: int = None, + nick_name: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ 鍙戦�佸璇濇秷鎭紙闈炴祦寮忔ā寮忥級 @@ -36,6 +38,7 @@ user: 鐢ㄦ埛鏍囪瘑 conversation_id: 浼氳瘽ID锛堝彲閫夛級 max_retries: 鏈�澶ч噸璇曟鏁� + nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛� Returns: 鍝嶅簲鏁版嵁瀛楀吀锛屽け璐ヨ繑鍥濶one @@ -44,11 +47,17 @@ max_retries = settings.max_retry_count url = f"{self.base_url}/chat-messages" + + # 鏋勫缓inputs鍙傛暟 + inputs = {} + if nick_name: + inputs["nick_name"] = nick_name + payload = { "query": query, "response_mode": "blocking", # 浣跨敤闃诲妯″紡锛堥潪娴佸紡锛� "user": user, - "inputs": {}, + "inputs": inputs, } # 濡傛灉鏈変細璇滻D锛屾坊鍔犲埌璇锋眰涓� @@ -98,6 +107,325 @@ ) return None + def send_chat_message_stream( + self, + query: str, + user: str, + conversation_id: Optional[str] = None, + max_retries: int = None, + nick_name: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + """ + 鍙戦�佸璇濇秷鎭紙娴佸紡妯″紡锛� + + Args: + query: 鐢ㄦ埛杈撳叆/鎻愰棶鍐呭 + user: 鐢ㄦ埛鏍囪瘑 + conversation_id: 浼氳瘽ID锛堝彲閫夛級 + max_retries: 鏈�澶ч噸璇曟鏁� + nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛� + + Returns: + 瀹屾暣鐨勫搷搴旀暟鎹瓧鍏革紝澶辫触杩斿洖None + """ + if max_retries is None: + max_retries = settings.max_retry_count + + url = f"{self.base_url}/chat-messages" + + # 鏋勫缓inputs鍙傛暟 + inputs = {} + if nick_name: + inputs["nick_name"] = nick_name + + payload = { + "query": query, + "response_mode": "streaming", # 浣跨敤娴佸紡妯″紡 + "user": user, + "inputs": inputs, + } + + # 濡傛灉鏈変細璇滻D锛屾坊鍔犲埌璇锋眰涓� + if conversation_id: + payload["conversation_id"] = conversation_id + + retry_count = 0 + while retry_count <= max_retries: + try: + logger.info( + f"鍙戦�丏ify娴佸紡娑堟伅: user={user}, conversation_id={conversation_id}, retry={retry_count}" + ) + + response = self.session.post( + url, + json=payload, + timeout=settings.dify_streaming_timeout, + stream=True + ) + response.raise_for_status() + + # 澶勭悊娴佸紡鍝嶅簲 + result = self._process_stream_response(response, user) + + if result: + logger.info( + f"Dify娴佸紡娑堟伅鍙戦�佹垚鍔�: user={user}, conversation_id={result.get('conversation_id')}" + ) + return result + else: + logger.error(f"Dify娴佸紡鍝嶅簲澶勭悊澶辫触: user={user}") + + except requests.exceptions.Timeout: + logger.warning(f"Dify娴佸紡璇锋眰瓒呮椂: user={user}, retry={retry_count}") + except requests.exceptions.RequestException as e: + logger.error( + f"Dify娴佸紡缃戠粶閿欒: user={user}, retry={retry_count}, error={str(e)}" + ) + except Exception as e: + logger.error( + f"Dify娴佸紡璇锋眰寮傚父: user={user}, retry={retry_count}, error={str(e)}" + ) + + retry_count += 1 + if retry_count <= max_retries: + wait_time = settings.retry_delay * retry_count + logger.info(f"绛夊緟閲嶈瘯: user={user}, wait_time={wait_time}s") + time.sleep(wait_time) + + logger.error( + f"Dify娴佸紡娑堟伅鍙戦�佸け璐ワ紝宸茶揪鏈�澶ч噸璇曟鏁�: user={user}, max_retries={max_retries}" + ) + return None + + def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]: + """ + 澶勭悊娴佸紡鍝嶅簲 + + Args: + response: requests鍝嶅簲瀵硅薄 + user: 鐢ㄦ埛鏍囪瘑 + + Returns: + 瀹屾暣鐨勫搷搴旀暟鎹瓧鍏革紝澶辫触杩斿洖None + """ + try: + # 妫�鏌ュ搷搴斿ご + content_type = response.headers.get('content-type', '') + if 'text/event-stream' not in content_type: + logger.warning(f"鍝嶅簲涓嶆槸SSE鏍煎紡: user={user}, content_type={content_type}") + + complete_answer = "" + conversation_id = "" + task_id = "" + message_id = "" + created_at = None + metadata = None + usage = None + retriever_resources = None + message_ended = False # 鏍囪鏄惁鏀跺埌message_end浜嬩欢 + + logger.info(f"寮�濮嬪鐞嗘祦寮忓搷搴�: user={user}") + + # 娣诲姞瓒呮椂鍜岃璁℃暟鍣� + line_count = 0 + max_empty_lines = 50 # 鏈�澶ц繛缁┖琛屾暟锛岄槻姝㈡棤闄愬惊鐜� + + for line in response.iter_lines(decode_unicode=True): + line_count += 1 + + if not line: + # 绌鸿璁℃暟锛岄槻姝㈡棤闄愮瓑寰� + if line_count > max_empty_lines and not complete_answer: + logger.warning(f"娴佸紡鍝嶅簲杩囧绌鸿锛屽彲鑳借繛鎺ュ紓甯�: user={user}, line_count={line_count}") + break + continue + + # 璺宠繃闈炴暟鎹 + if not line.startswith("data: "): + continue + + # 鎻愬彇JSON鏁版嵁 + data_str = line[6:] # 绉婚櫎 "data: " 鍓嶇紑 + + if not data_str.strip(): + continue + + try: + data = json.loads(data_str) + event = data.get("event", "") + + if event == "message": + # 娑堟伅浜嬩欢 - 绱Н绛旀鍐呭 + answer_chunk = data.get("answer", "") + complete_answer += answer_chunk + + # 淇濆瓨鍩烘湰淇℃伅 + if not conversation_id: + conversation_id = data.get("conversation_id", "") + if not task_id: + task_id = data.get("task_id", "") + if not message_id: + message_id = data.get("id", "") + if created_at is None: + created_at = data.get("created_at") + + logger.debug(f"鏀跺埌娑堟伅鍧�: user={user}, chunk_length={len(answer_chunk)}") + + elif event == "message_end": + # 娑堟伅缁撴潫浜嬩欢 - 鑾峰彇鍏冩暟鎹� + metadata = data.get("metadata") + usage = data.get("usage") + retriever_resources = data.get("retriever_resources") + message_ended = True + + logger.info(f"娴佸紡鍝嶅簲瀹屾垚: user={user}, total_length={len(complete_answer)}") + break + + elif event == "message_file": + # 鏂囦欢浜嬩欢 - 璁板綍浣嗙户缁鐞� + logger.debug(f"鏀跺埌鏂囦欢浜嬩欢: user={user}, file_type={data.get('type')}") + continue + + elif event == "message_replace": + # 娑堟伅鏇挎崲浜嬩欢 - 鏇挎崲绛旀鍐呭 + replace_answer = data.get("answer", "") + if replace_answer: + complete_answer = replace_answer + logger.info(f"娑堟伅鍐呭琚浛鎹�: user={user}, new_length={len(complete_answer)}") + continue + + elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]: + # 宸ヤ綔娴佺浉鍏充簨浠� - 璁板綍浣嗙户缁鐞� + logger.debug(f"鏀跺埌宸ヤ綔娴佷簨浠�: user={user}, event={event}") + continue + + elif event in ["tts_message", "tts_message_end"]: + # TTS闊抽浜嬩欢 - 璁板綍浣嗙户缁鐞� + logger.debug(f"鏀跺埌TTS浜嬩欢: user={user}, event={event}") + continue + + elif event in ["agent_thought", "agent_message"]: + # Agent鐩稿叧浜嬩欢 - 闇�瑕佺壒娈婂鐞� + logger.debug(f"鏀跺埌Agent浜嬩欢: user={user}, event={event}, data_keys={list(data.keys())}") + + # 浠巃gent浜嬩欢涓彁鍙朿onversation_id锛堝鏋滄湁鐨勮瘽锛� + if not conversation_id and data.get("conversation_id"): + conversation_id = data.get("conversation_id") + logger.info(f"浠嶢gent浜嬩欢鑾峰彇conversation_id: user={user}, conversation_id={conversation_id}") + + # 浠巃gent浜嬩欢涓彁鍙栧熀鏈俊鎭紙濡傛灉鏈夌殑璇濓級 + if not task_id and data.get("task_id"): + task_id = data.get("task_id") + logger.debug(f"浠嶢gent浜嬩欢鑾峰彇task_id: user={user}, task_id={task_id}") + if not message_id and data.get("id"): + message_id = data.get("id") + logger.debug(f"浠嶢gent浜嬩欢鑾峰彇message_id: user={user}, message_id={message_id}") + if created_at is None and data.get("created_at"): + created_at = data.get("created_at") + logger.debug(f"浠嶢gent浜嬩欢鑾峰彇created_at: user={user}, created_at={created_at}") + + # 妫�鏌gent_message鏄惁鍖呭惈answer鍐呭 + if event == "agent_message" and data.get("answer"): + agent_answer = data.get("answer", "") + complete_answer += agent_answer + logger.debug(f"浠嶢gent娑堟伅鑾峰彇鍐呭: user={user}, chunk_length={len(agent_answer)}") + + continue + + elif event == "error": + # 閿欒浜嬩欢 + error_msg = data.get("message", "鏈煡閿欒") + logger.error(f"娴佸紡鍝嶅簲閿欒: user={user}, error={error_msg}") + return None + + elif event == "ping": + # ping浜嬩欢 - 淇濇寔杩炴帴 + logger.debug(f"鏀跺埌ping浜嬩欢: user={user}") + continue + + else: + # 鏈煡浜嬩欢绫诲瀷 - 璁板綍浣嗙户缁鐞� + logger.debug(f"鏀跺埌鏈煡浜嬩欢: user={user}, event={event}") + continue + + except json.JSONDecodeError as e: + logger.warning(f"瑙f瀽娴佸紡鏁版嵁JSON澶辫触: user={user}, data={data_str}, error={str(e)}") + continue + + # 鏋勫缓瀹屾暣鍝嶅簲 + # 瀵逛簬Agent妯″紡锛屽彲鑳芥病鏈塩onversation_id锛屼絾鏈塼ask_id + # 鍦ㄨ繖绉嶆儏鍐典笅锛屾垜浠彲浠ヤ娇鐢╰ask_id浣滀负conversation_id鐨勬浛浠� + if conversation_id or (task_id and (complete_answer or message_ended)): + # 濡傛灉娌℃湁conversation_id浣嗘湁task_id锛屼娇鐢╰ask_id + final_conversation_id = conversation_id or task_id + + result = { + "event": "message", + "task_id": task_id, + "id": message_id, + "message_id": message_id, + "conversation_id": final_conversation_id, + "mode": "chat", + "answer": complete_answer, # 鍙兘涓虹┖瀛楃涓� + "created_at": created_at + } + + # 娣诲姞鍙�夊瓧娈� + if metadata: + result["metadata"] = metadata + if usage: + result["usage"] = usage + if retriever_resources: + result["retriever_resources"] = retriever_resources + + if complete_answer: + logger.info(f"娴佸紡鍝嶅簲澶勭悊鎴愬姛: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}") + else: + logger.info(f"娴佸紡鍝嶅簲澶勭悊鎴愬姛(鏃犲唴瀹�): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}") + return result + else: + logger.error(f"娴佸紡鍝嶅簲涓嶅畬鏁�: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}") + # 璁板綍鏇村璋冭瘯淇℃伅 + logger.debug(f"璋冭瘯淇℃伅: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}") + return None + + except Exception as e: + logger.error(f"澶勭悊娴佸紡鍝嶅簲寮傚父: user={user}, error={str(e)}") + return None + + def send_message( + self, + query: str, + user: str, + conversation_id: Optional[str] = None, + max_retries: int = None, + force_streaming: Optional[bool] = None, + nick_name: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + """ + 鍙戦�佸璇濇秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛� + + Args: + query: 鐢ㄦ埛杈撳叆/鎻愰棶鍐呭 + user: 鐢ㄦ埛鏍囪瘑 + conversation_id: 浼氳瘽ID锛堝彲閫夛級 + max_retries: 鏈�澶ч噸璇曟鏁� + force_streaming: 寮哄埗浣跨敤娴佸紡妯″紡锛堝彲閫夛紝瑕嗙洊閰嶇疆锛� + nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛� + + Returns: + 鍝嶅簲鏁版嵁瀛楀吀锛屽け璐ヨ繑鍥濶one + """ + # 纭畾浣跨敤鍝妯″紡 + use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled + + if use_streaming: + logger.info(f"浣跨敤娴佸紡妯″紡鍙戦�佹秷鎭�: user={user}") + return self.send_chat_message_stream(query, user, conversation_id, max_retries, nick_name) + else: + logger.info(f"浣跨敤闃诲妯″紡鍙戦�佹秷鎭�: user={user}") + return self.send_chat_message(query, user, conversation_id, max_retries, nick_name) + def get_conversation_messages( self, conversation_id: str, user: str ) -> Optional[List[Dict[str, Any]]]: -- Gitblit v1.9.1