From 99266ea57913663f9880c512726c42cb7e5e7f28 Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期一, 28 七月 2025 11:14:28 +0800 Subject: [PATCH] 新增忽略好友消息;删除多余文件 --- app/services/message_processor.py | 170 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 139 insertions(+), 31 deletions(-) diff --git a/app/services/message_processor.py b/app/services/message_processor.py index 11308fc..e70555a 100644 --- a/app/services/message_processor.py +++ b/app/services/message_processor.py @@ -3,7 +3,10 @@ """ import json -from typing import Dict, Any, Optional +import time +import re +from typing import Dict, Any, Optional, List, Tuple +from datetime import datetime from sqlalchemy.orm import Session from loguru import logger @@ -13,6 +16,8 @@ from app.services.redis_queue import redis_queue from app.services.ecloud_client import ecloud_client from app.services.dify_client import dify_client +from app.services.friend_ignore_service import friend_ignore_service +from config import settings class MessageProcessor: @@ -20,6 +25,51 @@ def __init__(self): pass + + def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: + """ + 瑙f瀽AI鍥炲涓殑@瀛楃锛屾彁鍙栧鏈嶅悕绉� + + Args: + ai_answer: AI鍥炲鍐呭 + + Returns: + (澶勭悊鍚庣殑娑堟伅鍐呭, 闇�瑕丂鐨勫鏈峸cid鍒楄〃) + """ + try: + # 鑾峰彇閰嶇疆鐨勫鏈嶅悕绉板垪琛� + customer_service_names = settings.customer_service_names + + # 鏌ユ壘鎵�鏈堾瀛楃鍚庣殑瀹㈡湇鍚嶇О + at_pattern = r'@([^\s]+)' + matches = re.findall(at_pattern, ai_answer) + + valid_at_names = [] + at_wc_ids = [] + + for match in matches: + # 妫�鏌ユ槸鍚﹀湪閰嶇疆鐨勫鏈嶅悕绉板垪琛ㄤ腑 + if match in customer_service_names: + valid_at_names.append(match) + logger.info(f"鍙戠幇鏈夋晥鐨凘瀹㈡湇鍚嶇О: {match}") + + # 濡傛灉鏈夋湁鏁堢殑@瀹㈡湇鍚嶇О锛屾煡璇㈡暟鎹簱鑾峰彇wcid + if valid_at_names: + with next(get_db()) as db: + for name in valid_at_names: + # 鏍规嵁nick_name鏌ユ壘鑱旂郴浜� + contact = db.query(Contact).filter(Contact.nick_name == name).first() + if contact: + at_wc_ids.append(contact.wc_id) + logger.info(f"鎵惧埌瀹㈡湇鑱旂郴浜�: name={name}, wc_id={contact.wc_id}") + else: + logger.warning(f"鏈壘鍒板鏈嶈仈绯讳汉: name={name}") + + return ai_answer, at_wc_ids + + except Exception as e: + logger.error(f"瑙f瀽@瀛楃寮傚父: error={str(e)}") + return ai_answer, [] def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: """ @@ -50,6 +100,12 @@ or not data.get("content") ): logger.warning(f"娑堟伅缂哄皯蹇呰瀛楁: data={data}") + return False + + # 妫�鏌ュ彂閫佽�呮槸鍚﹀湪濂藉弸蹇界暐鍒楄〃涓� + from_user = data.get("fromUser") + if friend_ignore_service.is_friend_ignored(from_user): + logger.info(f"蹇界暐濂藉弸鍙戦�佺殑娑堟伅: fromUser={from_user}") return False return True @@ -151,20 +207,18 @@ logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}") - # 鑾峰彇鏁版嵁搴撲細璇� - db = next(get_db()) - - try: + # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞� + with next(get_db()) as db: # 3.1 纭繚鑱旂郴浜轰俊鎭瓨鍦� if not self.ensure_contact_exists(from_group, w_id, db): logger.error(f"鑱旂郴浜轰俊鎭鐞嗗け璐�: from_group={from_group}") return False - # 3.2 鑾峰彇鐢ㄦ埛鐨刢onversation_id - conversation_id = redis_queue.get_conversation_id(from_user) + # 3.2 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id + conversation_id = redis_queue.get_conversation_id(from_user, from_group) - # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭� - dify_response = dify_client.send_chat_message( + # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛� + dify_response = dify_client.send_message( query=content, user=from_user, conversation_id=conversation_id ) @@ -176,55 +230,112 @@ ai_answer = dify_response.get("answer", "") new_conversation_id = dify_response.get("conversation_id", "") - # 鏇存柊Redis涓殑conversation_id + # 鏇存柊Redis涓殑conversation_id锛堝熀浜庣敤鎴�+缇ょ粍锛� if new_conversation_id: - redis_queue.set_conversation_id(from_user, new_conversation_id) + redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) # 3.3 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 - user_conversation_key = f"{from_user}_{new_conversation_id}" + # 鎸夌敤鎴枫�佺兢缁勫拰灏忔椂鍒嗙粍瀵硅瘽璁板綍 + current_time = datetime.now() + hour_key = current_time.strftime("%Y%m%d_%H") - # 妫�鏌ユ槸鍚﹀凡瀛樺湪璁板綍 + # 鏌ユ壘褰撳墠鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勫綋鍓嶅皬鏃剁殑瀵硅瘽璁板綍 existing_conversation = ( db.query(Conversation) - .filter(Conversation.user_conversation_key == user_conversation_key) + .filter( + Conversation.from_user == from_user, + Conversation.conversation_id == new_conversation_id, + Conversation.group == from_group, + Conversation.hour == hour_key + ) .first() ) if existing_conversation: - # 鏇存柊鐜版湁璁板綍 - existing_conversation.user_question = content - existing_conversation.ai_answer = ai_answer - existing_conversation.is_processed = True - logger.info(f"鏇存柊瀵硅瘽璁板綍: key={user_conversation_key}") + # 鏇存柊鐜版湁璁板綍 - 浣跨敤JSON鏍煎紡杩藉姞瀵硅瘽鍐呭锛堝綋鍓嶇敤鎴峰湪褰撳墠缇ょ粍褰撳墠灏忔椂鐨勫璇濓級 + try: + # 瑙f瀽鐜版湁鐨刢ontent JSON + if existing_conversation.content: + content_list = json.loads(existing_conversation.content) + else: + content_list = [] + + # 杩藉姞鏂扮殑瀵硅瘽鍐呭 + content_list.append({ + "user": content, + "ai": ai_answer + }) + + # 鏇存柊璁板綍 + existing_conversation.content = json.dumps(content_list, ensure_ascii=False) + existing_conversation.is_processed = True + logger.info(f"杩藉姞鍒板綋鍓嶇敤鎴风兢缁勫皬鏃跺璇濊褰�: user={from_user}, group={from_group}, hour={hour_key}, 瀵硅瘽杞={len(content_list)}") + except json.JSONDecodeError as e: + logger.error(f"瑙f瀽鐜版湁瀵硅瘽鍐呭JSON澶辫触: {str(e)}, 閲嶆柊鍒涘缓") + # 濡傛灉JSON瑙f瀽澶辫触锛岄噸鏂板垱寤篶ontent + content_list = [{"user": content, "ai": ai_answer}] + existing_conversation.content = json.dumps(content_list, ensure_ascii=False) + existing_conversation.is_processed = True else: - # 鍒涘缓鏂拌褰� + # 鍒涘缓鏂拌褰� - 鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽鎴栭娆″璇濓紝浣跨敤JSON鏍煎紡瀛樺偍瀵硅瘽鍐呭 + content_list = [{"user": content, "ai": ai_answer}] new_conversation = Conversation( - user_conversation_key=user_conversation_key, from_user=from_user, conversation_id=new_conversation_id, - user_question=content, - ai_answer=ai_answer, + group=from_group, + hour=hour_key, + content=json.dumps(content_list, ensure_ascii=False), is_processed=True, ) db.add(new_conversation) - logger.info(f"鍒涘缓瀵硅瘽璁板綍: key={user_conversation_key}") + logger.info(f"鍒涘缓鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽璁板綍: user={from_user}, group={from_group}, hour={hour_key}, 鍒濆瀵硅瘽杞=1") db.commit() # 鍙戦�丄I鍥炵瓟鍒扮兢鑱� - if ai_answer and ecloud_client.send_group_message( - w_id, from_group, ai_answer - ): + success = False + if ai_answer: + # 瑙f瀽AI鍥炲涓殑@瀛楃 + processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer) + + # 鍙戦�佹秷鎭紝鏈�澶氶噸璇�3娆� + for attempt in range(3): + if at_wc_ids: + # 濡傛灉鏈堾瀹㈡湇锛屼娇鐢ㄧ兢鑱夽鎺ュ彛 + logger.info(f"浣跨敤缇よ亰@鎺ュ彛鍙戦�佹秷鎭�: at_wc_ids={at_wc_ids}") + if ecloud_client.send_group_at_message( + w_id, from_group, processed_answer, at_wc_ids + ): + success = True + break + else: + # 鏅�氱兢鑱婃秷鎭� + logger.info("浣跨敤鏅�氱兢鑱婃帴鍙e彂閫佹秷鎭�") + if ecloud_client.send_group_message( + w_id, from_group, processed_answer + ): + success = True + break + + logger.warning(f"鍙戦�丄I鍥炵瓟澶辫触锛屽皾璇曢噸璇� ({attempt + 1}/3): from_user={from_user}") + if attempt < 2: # 涓嶆槸鏈�鍚庝竴娆″皾璇曪紝绛夊緟涓�娈垫椂闂村啀閲嶈瘯 + time.sleep(2 ** attempt) # 鎸囨暟閫�閬� + + if success: # 鏇存柊鍙戦�佺姸鎬� conversation = ( db.query(Conversation) .filter( - Conversation.user_conversation_key == user_conversation_key + Conversation.from_user == from_user, + Conversation.conversation_id == new_conversation_id, + Conversation.group == from_group, + Conversation.hour == hour_key ) .first() ) if conversation: conversation.is_sent = True + conversation.sent_time = current_time db.commit() logger.info(f"娑堟伅澶勭悊瀹屾垚: from_user={from_user}") @@ -232,9 +343,6 @@ else: logger.error(f"鍙戦�丄I鍥炵瓟澶辫触: from_user={from_user}") return False - - finally: - db.close() except Exception as e: logger.error(f"澶勭悊娑堟伅寮傚父: from_user={from_user}, error={str(e)}") -- Gitblit v1.9.1