From 69945b730fd3f6b6138ce50e49fc3392fcd74d71 Mon Sep 17 00:00:00 2001
From: yj <2077506045@qq.com>
Date: 星期一, 28 七月 2025 18:16:52 +0800
Subject: [PATCH] 新增关键词过滤;新增活跃客服统计;新增掉线通知

---
 app/services/dify_client.py |  330 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 329 insertions(+), 1 deletions(-)

diff --git a/app/services/dify_client.py b/app/services/dify_client.py
index f029c8e..0e6bbda 100644
--- a/app/services/dify_client.py
+++ b/app/services/dify_client.py
@@ -4,6 +4,7 @@
 
 import requests
 import time
+import json
 from typing import List, Dict, Optional, Any
 from loguru import logger
 from config import settings
@@ -27,6 +28,7 @@
         user: str,
         conversation_id: Optional[str] = None,
         max_retries: int = None,
+        nick_name: Optional[str] = None,
     ) -> Optional[Dict[str, Any]]:
         """
         鍙戦�佸璇濇秷鎭紙闈炴祦寮忔ā寮忥級
@@ -36,6 +38,7 @@
             user: 鐢ㄦ埛鏍囪瘑
             conversation_id: 浼氳瘽ID锛堝彲閫夛級
             max_retries: 鏈�澶ч噸璇曟鏁�
+            nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛�
 
         Returns:
             鍝嶅簲鏁版嵁瀛楀吀锛屽け璐ヨ繑鍥濶one
@@ -44,11 +47,17 @@
             max_retries = settings.max_retry_count
 
         url = f"{self.base_url}/chat-messages"
+
+        # 鏋勫缓inputs鍙傛暟
+        inputs = {}
+        if nick_name:
+            inputs["nick_name"] = nick_name
+
         payload = {
             "query": query,
             "response_mode": "blocking",  # 浣跨敤闃诲妯″紡锛堥潪娴佸紡锛�
             "user": user,
-            "inputs": {},
+            "inputs": inputs,
         }
 
         # 濡傛灉鏈変細璇滻D锛屾坊鍔犲埌璇锋眰涓�
@@ -98,6 +107,325 @@
         )
         return None
 
+    def send_chat_message_stream(
+        self,
+        query: str,
+        user: str,
+        conversation_id: Optional[str] = None,
+        max_retries: int = None,
+        nick_name: Optional[str] = None,
+    ) -> Optional[Dict[str, Any]]:
+        """
+        鍙戦�佸璇濇秷鎭紙娴佸紡妯″紡锛�
+
+        Args:
+            query: 鐢ㄦ埛杈撳叆/鎻愰棶鍐呭
+            user: 鐢ㄦ埛鏍囪瘑
+            conversation_id: 浼氳瘽ID锛堝彲閫夛級
+            max_retries: 鏈�澶ч噸璇曟鏁�
+            nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛�
+
+        Returns:
+            瀹屾暣鐨勫搷搴旀暟鎹瓧鍏革紝澶辫触杩斿洖None
+        """
+        if max_retries is None:
+            max_retries = settings.max_retry_count
+
+        url = f"{self.base_url}/chat-messages"
+
+        # 鏋勫缓inputs鍙傛暟
+        inputs = {}
+        if nick_name:
+            inputs["nick_name"] = nick_name
+
+        payload = {
+            "query": query,
+            "response_mode": "streaming",  # 浣跨敤娴佸紡妯″紡
+            "user": user,
+            "inputs": inputs,
+        }
+
+        # 濡傛灉鏈変細璇滻D锛屾坊鍔犲埌璇锋眰涓�
+        if conversation_id:
+            payload["conversation_id"] = conversation_id
+
+        retry_count = 0
+        while retry_count <= max_retries:
+            try:
+                logger.info(
+                    f"鍙戦�丏ify娴佸紡娑堟伅: user={user}, conversation_id={conversation_id}, retry={retry_count}"
+                )
+
+                response = self.session.post(
+                    url,
+                    json=payload,
+                    timeout=settings.dify_streaming_timeout,
+                    stream=True
+                )
+                response.raise_for_status()
+
+                # 澶勭悊娴佸紡鍝嶅簲
+                result = self._process_stream_response(response, user)
+
+                if result:
+                    logger.info(
+                        f"Dify娴佸紡娑堟伅鍙戦�佹垚鍔�: user={user}, conversation_id={result.get('conversation_id')}"
+                    )
+                    return result
+                else:
+                    logger.error(f"Dify娴佸紡鍝嶅簲澶勭悊澶辫触: user={user}")
+
+            except requests.exceptions.Timeout:
+                logger.warning(f"Dify娴佸紡璇锋眰瓒呮椂: user={user}, retry={retry_count}")
+            except requests.exceptions.RequestException as e:
+                logger.error(
+                    f"Dify娴佸紡缃戠粶閿欒: user={user}, retry={retry_count}, error={str(e)}"
+                )
+            except Exception as e:
+                logger.error(
+                    f"Dify娴佸紡璇锋眰寮傚父: user={user}, retry={retry_count}, error={str(e)}"
+                )
+
+            retry_count += 1
+            if retry_count <= max_retries:
+                wait_time = settings.retry_delay * retry_count
+                logger.info(f"绛夊緟閲嶈瘯: user={user}, wait_time={wait_time}s")
+                time.sleep(wait_time)
+
+        logger.error(
+            f"Dify娴佸紡娑堟伅鍙戦�佸け璐ワ紝宸茶揪鏈�澶ч噸璇曟鏁�: user={user}, max_retries={max_retries}"
+        )
+        return None
+
+    def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]:
+        """
+        澶勭悊娴佸紡鍝嶅簲
+
+        Args:
+            response: requests鍝嶅簲瀵硅薄
+            user: 鐢ㄦ埛鏍囪瘑
+
+        Returns:
+            瀹屾暣鐨勫搷搴旀暟鎹瓧鍏革紝澶辫触杩斿洖None
+        """
+        try:
+            # 妫�鏌ュ搷搴斿ご
+            content_type = response.headers.get('content-type', '')
+            if 'text/event-stream' not in content_type:
+                logger.warning(f"鍝嶅簲涓嶆槸SSE鏍煎紡: user={user}, content_type={content_type}")
+
+            complete_answer = ""
+            conversation_id = ""
+            task_id = ""
+            message_id = ""
+            created_at = None
+            metadata = None
+            usage = None
+            retriever_resources = None
+            message_ended = False  # 鏍囪鏄惁鏀跺埌message_end浜嬩欢
+
+            logger.info(f"寮�濮嬪鐞嗘祦寮忓搷搴�: user={user}")
+
+            # 娣诲姞瓒呮椂鍜岃璁℃暟鍣�
+            line_count = 0
+            max_empty_lines = 50  # 鏈�澶ц繛缁┖琛屾暟锛岄槻姝㈡棤闄愬惊鐜�
+
+            for line in response.iter_lines(decode_unicode=True):
+                line_count += 1
+
+                if not line:
+                    # 绌鸿璁℃暟锛岄槻姝㈡棤闄愮瓑寰�
+                    if line_count > max_empty_lines and not complete_answer:
+                        logger.warning(f"娴佸紡鍝嶅簲杩囧绌鸿锛屽彲鑳借繛鎺ュ紓甯�: user={user}, line_count={line_count}")
+                        break
+                    continue
+
+                # 璺宠繃闈炴暟鎹
+                if not line.startswith("data: "):
+                    continue
+
+                # 鎻愬彇JSON鏁版嵁
+                data_str = line[6:]  # 绉婚櫎 "data: " 鍓嶇紑
+
+                if not data_str.strip():
+                    continue
+
+                try:
+                    data = json.loads(data_str)
+                    event = data.get("event", "")
+
+                    if event == "message":
+                        # 娑堟伅浜嬩欢 - 绱Н绛旀鍐呭
+                        answer_chunk = data.get("answer", "")
+                        complete_answer += answer_chunk
+
+                        # 淇濆瓨鍩烘湰淇℃伅
+                        if not conversation_id:
+                            conversation_id = data.get("conversation_id", "")
+                        if not task_id:
+                            task_id = data.get("task_id", "")
+                        if not message_id:
+                            message_id = data.get("id", "")
+                        if created_at is None:
+                            created_at = data.get("created_at")
+
+                        logger.debug(f"鏀跺埌娑堟伅鍧�: user={user}, chunk_length={len(answer_chunk)}")
+
+                    elif event == "message_end":
+                        # 娑堟伅缁撴潫浜嬩欢 - 鑾峰彇鍏冩暟鎹�
+                        metadata = data.get("metadata")
+                        usage = data.get("usage")
+                        retriever_resources = data.get("retriever_resources")
+                        message_ended = True
+
+                        logger.info(f"娴佸紡鍝嶅簲瀹屾垚: user={user}, total_length={len(complete_answer)}")
+                        break
+
+                    elif event == "message_file":
+                        # 鏂囦欢浜嬩欢 - 璁板綍浣嗙户缁鐞�
+                        logger.debug(f"鏀跺埌鏂囦欢浜嬩欢: user={user}, file_type={data.get('type')}")
+                        continue
+
+                    elif event == "message_replace":
+                        # 娑堟伅鏇挎崲浜嬩欢 - 鏇挎崲绛旀鍐呭
+                        replace_answer = data.get("answer", "")
+                        if replace_answer:
+                            complete_answer = replace_answer
+                            logger.info(f"娑堟伅鍐呭琚浛鎹�: user={user}, new_length={len(complete_answer)}")
+                        continue
+
+                    elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]:
+                        # 宸ヤ綔娴佺浉鍏充簨浠� - 璁板綍浣嗙户缁鐞�
+                        logger.debug(f"鏀跺埌宸ヤ綔娴佷簨浠�: user={user}, event={event}")
+                        continue
+
+                    elif event in ["tts_message", "tts_message_end"]:
+                        # TTS闊抽浜嬩欢 - 璁板綍浣嗙户缁鐞�
+                        logger.debug(f"鏀跺埌TTS浜嬩欢: user={user}, event={event}")
+                        continue
+
+                    elif event in ["agent_thought", "agent_message"]:
+                        # Agent鐩稿叧浜嬩欢 - 闇�瑕佺壒娈婂鐞�
+                        logger.debug(f"鏀跺埌Agent浜嬩欢: user={user}, event={event}, data_keys={list(data.keys())}")
+
+                        # 浠巃gent浜嬩欢涓彁鍙朿onversation_id锛堝鏋滄湁鐨勮瘽锛�
+                        if not conversation_id and data.get("conversation_id"):
+                            conversation_id = data.get("conversation_id")
+                            logger.info(f"浠嶢gent浜嬩欢鑾峰彇conversation_id: user={user}, conversation_id={conversation_id}")
+
+                        # 浠巃gent浜嬩欢涓彁鍙栧熀鏈俊鎭紙濡傛灉鏈夌殑璇濓級
+                        if not task_id and data.get("task_id"):
+                            task_id = data.get("task_id")
+                            logger.debug(f"浠嶢gent浜嬩欢鑾峰彇task_id: user={user}, task_id={task_id}")
+                        if not message_id and data.get("id"):
+                            message_id = data.get("id")
+                            logger.debug(f"浠嶢gent浜嬩欢鑾峰彇message_id: user={user}, message_id={message_id}")
+                        if created_at is None and data.get("created_at"):
+                            created_at = data.get("created_at")
+                            logger.debug(f"浠嶢gent浜嬩欢鑾峰彇created_at: user={user}, created_at={created_at}")
+
+                        # 妫�鏌gent_message鏄惁鍖呭惈answer鍐呭
+                        if event == "agent_message" and data.get("answer"):
+                            agent_answer = data.get("answer", "")
+                            complete_answer += agent_answer
+                            logger.debug(f"浠嶢gent娑堟伅鑾峰彇鍐呭: user={user}, chunk_length={len(agent_answer)}")
+
+                        continue
+
+                    elif event == "error":
+                        # 閿欒浜嬩欢
+                        error_msg = data.get("message", "鏈煡閿欒")
+                        logger.error(f"娴佸紡鍝嶅簲閿欒: user={user}, error={error_msg}")
+                        return None
+
+                    elif event == "ping":
+                        # ping浜嬩欢 - 淇濇寔杩炴帴
+                        logger.debug(f"鏀跺埌ping浜嬩欢: user={user}")
+                        continue
+
+                    else:
+                        # 鏈煡浜嬩欢绫诲瀷 - 璁板綍浣嗙户缁鐞�
+                        logger.debug(f"鏀跺埌鏈煡浜嬩欢: user={user}, event={event}")
+                        continue
+
+                except json.JSONDecodeError as e:
+                    logger.warning(f"瑙f瀽娴佸紡鏁版嵁JSON澶辫触: user={user}, data={data_str}, error={str(e)}")
+                    continue
+
+            # 鏋勫缓瀹屾暣鍝嶅簲
+            # 瀵逛簬Agent妯″紡锛屽彲鑳芥病鏈塩onversation_id锛屼絾鏈塼ask_id
+            # 鍦ㄨ繖绉嶆儏鍐典笅锛屾垜浠彲浠ヤ娇鐢╰ask_id浣滀负conversation_id鐨勬浛浠�
+            if conversation_id or (task_id and (complete_answer or message_ended)):
+                # 濡傛灉娌℃湁conversation_id浣嗘湁task_id锛屼娇鐢╰ask_id
+                final_conversation_id = conversation_id or task_id
+
+                result = {
+                    "event": "message",
+                    "task_id": task_id,
+                    "id": message_id,
+                    "message_id": message_id,
+                    "conversation_id": final_conversation_id,
+                    "mode": "chat",
+                    "answer": complete_answer,  # 鍙兘涓虹┖瀛楃涓�
+                    "created_at": created_at
+                }
+
+                # 娣诲姞鍙�夊瓧娈�
+                if metadata:
+                    result["metadata"] = metadata
+                if usage:
+                    result["usage"] = usage
+                if retriever_resources:
+                    result["retriever_resources"] = retriever_resources
+
+                if complete_answer:
+                    logger.info(f"娴佸紡鍝嶅簲澶勭悊鎴愬姛: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}")
+                else:
+                    logger.info(f"娴佸紡鍝嶅簲澶勭悊鎴愬姛(鏃犲唴瀹�): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}")
+                return result
+            else:
+                logger.error(f"娴佸紡鍝嶅簲涓嶅畬鏁�: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}")
+                # 璁板綍鏇村璋冭瘯淇℃伅
+                logger.debug(f"璋冭瘯淇℃伅: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}")
+                return None
+
+        except Exception as e:
+            logger.error(f"澶勭悊娴佸紡鍝嶅簲寮傚父: user={user}, error={str(e)}")
+            return None
+
+    def send_message(
+        self,
+        query: str,
+        user: str,
+        conversation_id: Optional[str] = None,
+        max_retries: int = None,
+        force_streaming: Optional[bool] = None,
+        nick_name: Optional[str] = None,
+    ) -> Optional[Dict[str, Any]]:
+        """
+        鍙戦�佸璇濇秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛�
+
+        Args:
+            query: 鐢ㄦ埛杈撳叆/鎻愰棶鍐呭
+            user: 鐢ㄦ埛鏍囪瘑
+            conversation_id: 浼氳瘽ID锛堝彲閫夛級
+            max_retries: 鏈�澶ч噸璇曟鏁�
+            force_streaming: 寮哄埗浣跨敤娴佸紡妯″紡锛堝彲閫夛紝瑕嗙洊閰嶇疆锛�
+            nick_name: 鏄电О锛堝彲閫夛紝鐢ㄤ簬浼犻�掔粰dify锛�
+
+        Returns:
+            鍝嶅簲鏁版嵁瀛楀吀锛屽け璐ヨ繑鍥濶one
+        """
+        # 纭畾浣跨敤鍝妯″紡
+        use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled
+
+        if use_streaming:
+            logger.info(f"浣跨敤娴佸紡妯″紡鍙戦�佹秷鎭�: user={user}")
+            return self.send_chat_message_stream(query, user, conversation_id, max_retries, nick_name)
+        else:
+            logger.info(f"浣跨敤闃诲妯″紡鍙戦�佹秷鎭�: user={user}")
+            return self.send_chat_message(query, user, conversation_id, max_retries, nick_name)
+
     def get_conversation_messages(
         self, conversation_id: str, user: str
     ) -> Optional[List[Dict[str, Any]]]:

--
Gitblit v1.9.1