From 99266ea57913663f9880c512726c42cb7e5e7f28 Mon Sep 17 00:00:00 2001
From: yj <2077506045@qq.com>
Date: 星期一, 28 七月 2025 11:14:28 +0800
Subject: [PATCH] 新增忽略好友消息;删除多余文件

---
 app/services/message_processor.py |  170 ++++++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 139 insertions(+), 31 deletions(-)

diff --git a/app/services/message_processor.py b/app/services/message_processor.py
index 11308fc..e70555a 100644
--- a/app/services/message_processor.py
+++ b/app/services/message_processor.py
@@ -3,7 +3,10 @@
 """
 
 import json
-from typing import Dict, Any, Optional
+import time
+import re
+from typing import Dict, Any, Optional, List, Tuple
+from datetime import datetime
 from sqlalchemy.orm import Session
 from loguru import logger
 
@@ -13,6 +16,8 @@
 from app.services.redis_queue import redis_queue
 from app.services.ecloud_client import ecloud_client
 from app.services.dify_client import dify_client
+from app.services.friend_ignore_service import friend_ignore_service
+from config import settings
 
 
 class MessageProcessor:
@@ -20,6 +25,51 @@
 
     def __init__(self):
         pass
+
+    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
+        """
+        瑙f瀽AI鍥炲涓殑@瀛楃锛屾彁鍙栧鏈嶅悕绉�
+
+        Args:
+            ai_answer: AI鍥炲鍐呭
+
+        Returns:
+            (澶勭悊鍚庣殑娑堟伅鍐呭, 闇�瑕丂鐨勫鏈峸cid鍒楄〃)
+        """
+        try:
+            # 鑾峰彇閰嶇疆鐨勫鏈嶅悕绉板垪琛�
+            customer_service_names = settings.customer_service_names
+
+            # 鏌ユ壘鎵�鏈堾瀛楃鍚庣殑瀹㈡湇鍚嶇О
+            at_pattern = r'@([^\s]+)'
+            matches = re.findall(at_pattern, ai_answer)
+
+            valid_at_names = []
+            at_wc_ids = []
+
+            for match in matches:
+                # 妫�鏌ユ槸鍚﹀湪閰嶇疆鐨勫鏈嶅悕绉板垪琛ㄤ腑
+                if match in customer_service_names:
+                    valid_at_names.append(match)
+                    logger.info(f"鍙戠幇鏈夋晥鐨凘瀹㈡湇鍚嶇О: {match}")
+
+            # 濡傛灉鏈夋湁鏁堢殑@瀹㈡湇鍚嶇О锛屾煡璇㈡暟鎹簱鑾峰彇wcid
+            if valid_at_names:
+                with next(get_db()) as db:
+                    for name in valid_at_names:
+                        # 鏍规嵁nick_name鏌ユ壘鑱旂郴浜�
+                        contact = db.query(Contact).filter(Contact.nick_name == name).first()
+                        if contact:
+                            at_wc_ids.append(contact.wc_id)
+                            logger.info(f"鎵惧埌瀹㈡湇鑱旂郴浜�: name={name}, wc_id={contact.wc_id}")
+                        else:
+                            logger.warning(f"鏈壘鍒板鏈嶈仈绯讳汉: name={name}")
+
+            return ai_answer, at_wc_ids
+
+        except Exception as e:
+            logger.error(f"瑙f瀽@瀛楃寮傚父: error={str(e)}")
+            return ai_answer, []
 
     def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
         """
@@ -50,6 +100,12 @@
             or not data.get("content")
         ):
             logger.warning(f"娑堟伅缂哄皯蹇呰瀛楁: data={data}")
+            return False
+
+        # 妫�鏌ュ彂閫佽�呮槸鍚﹀湪濂藉弸蹇界暐鍒楄〃涓�
+        from_user = data.get("fromUser")
+        if friend_ignore_service.is_friend_ignored(from_user):
+            logger.info(f"蹇界暐濂藉弸鍙戦�佺殑娑堟伅: fromUser={from_user}")
             return False
 
         return True
@@ -151,20 +207,18 @@
 
             logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}")
 
-            # 鑾峰彇鏁版嵁搴撲細璇�
-            db = next(get_db())
-
-            try:
+            # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞�
+            with next(get_db()) as db:
                 # 3.1 纭繚鑱旂郴浜轰俊鎭瓨鍦�
                 if not self.ensure_contact_exists(from_group, w_id, db):
                     logger.error(f"鑱旂郴浜轰俊鎭鐞嗗け璐�: from_group={from_group}")
                     return False
 
-                # 3.2 鑾峰彇鐢ㄦ埛鐨刢onversation_id
-                conversation_id = redis_queue.get_conversation_id(from_user)
+                # 3.2 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id
+                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
 
-                # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭�
-                dify_response = dify_client.send_chat_message(
+                # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛�
+                dify_response = dify_client.send_message(
                     query=content, user=from_user, conversation_id=conversation_id
                 )
 
@@ -176,55 +230,112 @@
                 ai_answer = dify_response.get("answer", "")
                 new_conversation_id = dify_response.get("conversation_id", "")
 
-                # 鏇存柊Redis涓殑conversation_id
+                # 鏇存柊Redis涓殑conversation_id锛堝熀浜庣敤鎴�+缇ょ粍锛�
                 if new_conversation_id:
-                    redis_queue.set_conversation_id(from_user, new_conversation_id)
+                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group)
 
                 # 3.3 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱
-                user_conversation_key = f"{from_user}_{new_conversation_id}"
+                # 鎸夌敤鎴枫�佺兢缁勫拰灏忔椂鍒嗙粍瀵硅瘽璁板綍
+                current_time = datetime.now()
+                hour_key = current_time.strftime("%Y%m%d_%H")
 
-                # 妫�鏌ユ槸鍚﹀凡瀛樺湪璁板綍
+                # 鏌ユ壘褰撳墠鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勫綋鍓嶅皬鏃剁殑瀵硅瘽璁板綍
                 existing_conversation = (
                     db.query(Conversation)
-                    .filter(Conversation.user_conversation_key == user_conversation_key)
+                    .filter(
+                        Conversation.from_user == from_user,
+                        Conversation.conversation_id == new_conversation_id,
+                        Conversation.group == from_group,
+                        Conversation.hour == hour_key
+                    )
                     .first()
                 )
 
                 if existing_conversation:
-                    # 鏇存柊鐜版湁璁板綍
-                    existing_conversation.user_question = content
-                    existing_conversation.ai_answer = ai_answer
-                    existing_conversation.is_processed = True
-                    logger.info(f"鏇存柊瀵硅瘽璁板綍: key={user_conversation_key}")
+                    # 鏇存柊鐜版湁璁板綍 - 浣跨敤JSON鏍煎紡杩藉姞瀵硅瘽鍐呭锛堝綋鍓嶇敤鎴峰湪褰撳墠缇ょ粍褰撳墠灏忔椂鐨勫璇濓級
+                    try:
+                        # 瑙f瀽鐜版湁鐨刢ontent JSON
+                        if existing_conversation.content:
+                            content_list = json.loads(existing_conversation.content)
+                        else:
+                            content_list = []
+
+                        # 杩藉姞鏂扮殑瀵硅瘽鍐呭
+                        content_list.append({
+                            "user": content,
+                            "ai": ai_answer
+                        })
+
+                        # 鏇存柊璁板綍
+                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
+                        existing_conversation.is_processed = True
+                        logger.info(f"杩藉姞鍒板綋鍓嶇敤鎴风兢缁勫皬鏃跺璇濊褰�: user={from_user}, group={from_group}, hour={hour_key}, 瀵硅瘽杞={len(content_list)}")
+                    except json.JSONDecodeError as e:
+                        logger.error(f"瑙f瀽鐜版湁瀵硅瘽鍐呭JSON澶辫触: {str(e)}, 閲嶆柊鍒涘缓")
+                        # 濡傛灉JSON瑙f瀽澶辫触锛岄噸鏂板垱寤篶ontent
+                        content_list = [{"user": content, "ai": ai_answer}]
+                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
+                        existing_conversation.is_processed = True
                 else:
-                    # 鍒涘缓鏂拌褰�
+                    # 鍒涘缓鏂拌褰� - 鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽鎴栭娆″璇濓紝浣跨敤JSON鏍煎紡瀛樺偍瀵硅瘽鍐呭
+                    content_list = [{"user": content, "ai": ai_answer}]
                     new_conversation = Conversation(
-                        user_conversation_key=user_conversation_key,
                         from_user=from_user,
                         conversation_id=new_conversation_id,
-                        user_question=content,
-                        ai_answer=ai_answer,
+                        group=from_group,
+                        hour=hour_key,
+                        content=json.dumps(content_list, ensure_ascii=False),
                         is_processed=True,
                     )
                     db.add(new_conversation)
-                    logger.info(f"鍒涘缓瀵硅瘽璁板綍: key={user_conversation_key}")
+                    logger.info(f"鍒涘缓鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽璁板綍: user={from_user}, group={from_group}, hour={hour_key}, 鍒濆瀵硅瘽杞=1")
 
                 db.commit()
 
                 # 鍙戦�丄I鍥炵瓟鍒扮兢鑱�
-                if ai_answer and ecloud_client.send_group_message(
-                    w_id, from_group, ai_answer
-                ):
+                success = False
+                if ai_answer:
+                    # 瑙f瀽AI鍥炲涓殑@瀛楃
+                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
+
+                    # 鍙戦�佹秷鎭紝鏈�澶氶噸璇�3娆�
+                    for attempt in range(3):
+                        if at_wc_ids:
+                            # 濡傛灉鏈堾瀹㈡湇锛屼娇鐢ㄧ兢鑱夽鎺ュ彛
+                            logger.info(f"浣跨敤缇よ亰@鎺ュ彛鍙戦�佹秷鎭�: at_wc_ids={at_wc_ids}")
+                            if ecloud_client.send_group_at_message(
+                                w_id, from_group, processed_answer, at_wc_ids
+                            ):
+                                success = True
+                                break
+                        else:
+                            # 鏅�氱兢鑱婃秷鎭�
+                            logger.info("浣跨敤鏅�氱兢鑱婃帴鍙e彂閫佹秷鎭�")
+                            if ecloud_client.send_group_message(
+                                w_id, from_group, processed_answer
+                            ):
+                                success = True
+                                break
+
+                        logger.warning(f"鍙戦�丄I鍥炵瓟澶辫触锛屽皾璇曢噸璇� ({attempt + 1}/3): from_user={from_user}")
+                        if attempt < 2:  # 涓嶆槸鏈�鍚庝竴娆″皾璇曪紝绛夊緟涓�娈垫椂闂村啀閲嶈瘯
+                            time.sleep(2 ** attempt)  # 鎸囨暟閫�閬�
+                
+                if success:
                     # 鏇存柊鍙戦�佺姸鎬�
                     conversation = (
                         db.query(Conversation)
                         .filter(
-                            Conversation.user_conversation_key == user_conversation_key
+                            Conversation.from_user == from_user,
+                            Conversation.conversation_id == new_conversation_id,
+                            Conversation.group == from_group,
+                            Conversation.hour == hour_key
                         )
                         .first()
                     )
                     if conversation:
                         conversation.is_sent = True
+                        conversation.sent_time = current_time
                         db.commit()
 
                     logger.info(f"娑堟伅澶勭悊瀹屾垚: from_user={from_user}")
@@ -232,9 +343,6 @@
                 else:
                     logger.error(f"鍙戦�丄I鍥炵瓟澶辫触: from_user={from_user}")
                     return False
-
-            finally:
-                db.close()
 
         except Exception as e:
             logger.error(f"澶勭悊娑堟伅寮傚父: from_user={from_user}, error={str(e)}")

--
Gitblit v1.9.1