From 0ed1530b4049944f44554ba4711acb823a211854 Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期二, 26 八月 2025 10:37:23 +0800 Subject: [PATCH] 1.新增结束语判断;2.增加@客服后静默;3.增加判断群已静默后AI已回复也不发消息 --- app/services/message_processor.py | 277 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 247 insertions(+), 30 deletions(-) diff --git a/app/services/message_processor.py b/app/services/message_processor.py index 12aaea2..0b26bff 100644 --- a/app/services/message_processor.py +++ b/app/services/message_processor.py @@ -5,6 +5,7 @@ import json import time import re +import xml.etree.ElementTree as ET from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from sqlalchemy.orm import Session @@ -16,6 +17,9 @@ from app.services.redis_queue import redis_queue from app.services.ecloud_client import ecloud_client from app.services.dify_client import dify_client +from app.services.friend_ignore_service import friend_ignore_service +from app.services.silence_service import silence_service +from app.services.group_stats_service import group_stats_service from config import settings @@ -24,6 +28,71 @@ def __init__(self): pass + + def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: + """ + 鎻愬彇寮曠敤娑堟伅鐨勫唴瀹癸紙message_type涓�80014锛� + + Args: + callback_data: 鍥炶皟鏁版嵁 + + Returns: + 缁勫悎鍚庣殑娑堟伅鍐呭锛坈ontent + title锛屼腑闂寸┖涓�琛岋級 + """ + try: + data = callback_data.get("data", {}) + content = data.get("content", "") + title = data.get("title", "") + + # 瑙f瀽XML鍐呭锛屾彁鍙杕sg>appmsg>refermsg>content鏍囩涓殑鍐呭 + refer_content = "" + xml_title = "" + try: + # 瑙f瀽XML + root = ET.fromstring(content) + + # 鏌ユ壘msg>appmsg>refermsg>content璺緞 + appmsg = root.find("appmsg") + if appmsg is not None: + # 鎻愬彇XML涓殑title锛堝鏋滃瓨鍦級 + title_element = appmsg.find("title") + if title_element is not None and title_element.text: + xml_title = title_element.text.strip() + + # 鎻愬彇寮曠敤娑堟伅鍐呭 + refermsg = appmsg.find("refermsg") + if refermsg is not None: + content_element = refermsg.find("content") + if content_element is not None and content_element.text: + refer_content = content_element.text.strip() + + except ET.ParseError as e: + logger.warning(f"XML瑙f瀽澶辫触: {str(e)}, content={content}") + # 濡傛灉XML瑙f瀽澶辫触锛屼娇鐢ㄥ師濮媍ontent + refer_content = content + + # 纭畾鏈�缁堜娇鐢ㄧ殑title锛氫紭鍏堜娇鐢╔ML涓殑title锛屽叾娆′娇鐢╠ata.title + final_title = xml_title if xml_title else title + + # 缁勫悎鍐呭锛歳efer_content鍦ㄥ墠锛宖inal_title鍦ㄥ悗锛屼腑闂寸┖涓�琛� + if refer_content and final_title: + combined_content = f"{refer_content}\n\n{final_title}" + elif refer_content: + combined_content = refer_content + elif final_title: + combined_content = final_title + else: + combined_content = content # 濡傛灉閮芥病鏈夛紝浣跨敤鍘熷content + + logger.info( + f"寮曠敤娑堟伅鍐呭鎻愬彇瀹屾垚: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}" + ) + return combined_content + + except Exception as e: + logger.error(f"鎻愬彇寮曠敤娑堟伅鍐呭寮傚父: error={str(e)}") + # 寮傚父鎯呭喌涓嬭繑鍥炲師濮媍ontent + return callback_data.get("data", {}).get("content", "") def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: """ @@ -40,7 +109,7 @@ customer_service_names = settings.customer_service_names # 鏌ユ壘鎵�鏈堾瀛楃鍚庣殑瀹㈡湇鍚嶇О - at_pattern = r'@([^\s]+)' + at_pattern = r"@([^\s]+)" matches = re.findall(at_pattern, ai_answer) valid_at_names = [] @@ -57,10 +126,14 @@ with next(get_db()) as db: for name in valid_at_names: # 鏍规嵁nick_name鏌ユ壘鑱旂郴浜� - contact = db.query(Contact).filter(Contact.nick_name == name).first() + contact = ( + db.query(Contact).filter(Contact.nick_name == name).first() + ) if contact: at_wc_ids.append(contact.wc_id) - logger.info(f"鎵惧埌瀹㈡湇鑱旂郴浜�: name={name}, wc_id={contact.wc_id}") + logger.info( + f"鎵惧埌瀹㈡湇鑱旂郴浜�: name={name}, wc_id={contact.wc_id}" + ) else: logger.warning(f"鏈壘鍒板鏈嶈仈绯讳汉: name={name}") @@ -69,6 +142,28 @@ except Exception as e: logger.error(f"瑙f瀽@瀛楃寮傚父: error={str(e)}") return ai_answer, [] + + def is_end_str(self, ai_answer: str) -> bool: + """ + 瑙f瀽AI鍥炲鍒ゆ柇鏄惁鏄粨鏉熷瓧绗︿覆 + + Args: + ai_answer: AI鍥炲鍐呭 + + Returns: + """ + try: + # 鑾峰彇閰嶇疆鐨勭粨鏉熷瓧绗︿覆鍒楄〃 + end_str_list = settings.end_str_list + for end_str in end_str_list: + substrings = end_str.split(',') + # 妫�鏌� match_str 鏄惁鍖呭惈姣忎竴涓瓙瀛楃涓� + if all(sub in ai_answer for sub in substrings): + return True + return False + except Exception as e: + logger.error(f"瑙f瀽缁撴潫瀛楃涓插紓甯�: error={str(e)}") + return False def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: """ @@ -80,9 +175,10 @@ Returns: 鏄惁鏄湁鏁堢殑缇よ亰娑堟伅 """ - # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛� + # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛夋垨寮曠敤娑堟伅锛�80014锛� message_type = callback_data.get("messageType") - if message_type != "80001": + print(f"data: {callback_data}") + if message_type not in ["80001", "80014"]: logger.info(f"蹇界暐闈炵兢鑱婃秷鎭�: messageType={message_type}") return False @@ -101,6 +197,42 @@ logger.warning(f"娑堟伅缂哄皯蹇呰瀛楁: data={data}") return False + # 鑾峰彇鐢ㄦ埛鍜岀兢缁勪俊鎭� + from_user = data.get("fromUser") + from_group = data.get("fromGroup") + + # 妫�鏌ュ彂閫佽�呮槸鍚﹀湪濂藉弸蹇界暐鍒楄〃涓� + is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user) + + if is_friend_ignored: + logger.info( + f"蹇界暐濂藉弸鍙戦�佺殑娑堟伅: fromUser={from_user}, fromGroup={from_group}" + ) + # 缁熻琚拷鐣ョ殑濂藉弸鍙戣█娆℃暟锛堢‘淇濊蹇界暐鐨勫ソ鍙嬫秷鎭篃绾冲叆缁熻锛� + group_stats_service.increment_user_message_count(from_group, from_user) + # 婵�娲绘垨寤堕暱璇ョ兢缁勭殑闈欓粯妯″紡 + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡鏃堕棿宸插埛鏂�: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info( + f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡宸叉縺娲�: fromUser={from_user}, fromGroup={from_group}" + ) + return False + + # # 缁熻姝e父澶勭悊鐨勫ソ鍙嬪彂瑷�娆℃暟 + # group_stats_service.increment_user_message_count(from_group, from_user) + + # 妫�鏌ヨ缇ょ粍鐨勯潤榛樻ā寮忔槸鍚︽縺娲伙紙鍦ㄥソ鍙嬪拷鐣ユ鏌ヤ箣鍚庯級 + if silence_service.is_silence_active(from_group): + logger.info(f"缇ょ粍闈欓粯妯″紡婵�娲讳腑锛屽拷鐣ユ秷鎭�: fromGroup={from_group}") + return False + return True def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: @@ -117,11 +249,33 @@ if not self.is_valid_group_message(callback_data): return False - data = callback_data.get("data", {}) - from_user = data.get("fromUser") + # 瀵煎叆娑堟伅鑱氬悎鏈嶅姟锛堝欢杩熷鍏ラ伩鍏嶅惊鐜緷璧栵級 + from app.services.message_aggregator import message_aggregator - # 灏嗘暣涓洖璋冨唴瀹硅浆鎴怞SON瀛楃涓插瓨鍌ㄥ埌Redis闃熷垪 - return redis_queue.enqueue_message(from_user, callback_data) + # 灏濊瘯灏嗘秷鎭坊鍔犲埌鑱氬悎涓� + should_process_immediately, aggregated_data = ( + message_aggregator.add_message_to_aggregation(callback_data) + ) + + if should_process_immediately: + # 闇�瑕佺珛鍗冲鐞嗭紙涓嶈仛鍚堟垨鑱氬悎瓒呮椂锛� + data = ( + aggregated_data.get("data", {}) + if aggregated_data + else callback_data.get("data", {}) + ) + from_user = data.get("fromUser") + + # 灏嗘秷鎭姞鍏edis闃熷垪 + return redis_queue.enqueue_message( + from_user, aggregated_data or callback_data + ) + else: + # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞� + logger.info( + f"娑堟伅宸叉坊鍔犲埌鑱氬悎闃熷垪锛岀瓑寰呰仛鍚堝鐞�: fromUser={callback_data.get('data', {}).get('fromUser')}" + ) + return True def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: """ @@ -197,8 +351,17 @@ from_group = data.get("fromGroup") content = data.get("content") w_id = data.get("wId") + message_type = message_data.get("messageType") - logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}") + logger.info( + f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}, message_type={message_type}" + ) + + # 鏍规嵁娑堟伅绫诲瀷澶勭悊鍐呭 + if message_type == "80014": + # 寮曠敤娑堟伅锛岄渶瑕佹彁鍙朮ML涓殑鍐呭骞朵笌title缁勫悎 + content = self.extract_refer_message_content(message_data) + logger.info(f"寮曠敤娑堟伅鍐呭澶勭悊瀹屾垚: content_length={len(content)}") # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞� with next(get_db()) as db: @@ -207,13 +370,29 @@ logger.error(f"鑱旂郴浜轰俊鎭鐞嗗け璐�: from_group={from_group}") return False - # 3.2 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id + # 3.2 鑾峰彇缇ょ粍涓彂瑷�娆℃暟鏈�澶氱殑鐢ㄦ埛鏄电О + most_active_nickname = ( + group_stats_service.get_most_active_user_nickname(from_group) + ) + logger.info( + f"缇ょ粍鏈�娲昏穬鐢ㄦ埛鏄电О: group={from_group}, nickname={most_active_nickname}" + ) + + # 3.3 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id conversation_id = redis_queue.get_conversation_id(from_user, from_group) - # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭� - dify_response = dify_client.send_chat_message( - query=content, user=from_user, conversation_id=conversation_id + # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛� + dify_response = dify_client.send_message( + query=content, + user=from_user, + conversation_id=conversation_id, + nick_name=most_active_nickname, ) + + if silence_service.is_silence_active(from_group): + # 鍥炲鍓嶅垽鏂槸鍚︽縺娲婚潤榛橈紝宸查潤榛樺垯涓嶅洖澶� + logger.error(f"Dify宸插搷搴斾絾缇ょ粍宸查潤榛橈細from_user={from_user}") + return False if not dify_response: logger.error(f"Dify鍝嶅簲澶辫触: from_user={from_user}") @@ -225,9 +404,11 @@ # 鏇存柊Redis涓殑conversation_id锛堝熀浜庣敤鎴�+缇ょ粍锛� if new_conversation_id: - redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) + redis_queue.set_conversation_id( + from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60 + ) - # 3.3 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 + # 3.4 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 # 鎸夌敤鎴枫�佺兢缁勫拰灏忔椂鍒嗙粍瀵硅瘽璁板綍 current_time = datetime.now() hour_key = current_time.strftime("%Y%m%d_%H") @@ -239,7 +420,7 @@ Conversation.from_user == from_user, Conversation.conversation_id == new_conversation_id, Conversation.group == from_group, - Conversation.hour == hour_key + Conversation.hour == hour_key, ) .first() ) @@ -254,20 +435,23 @@ content_list = [] # 杩藉姞鏂扮殑瀵硅瘽鍐呭 - content_list.append({ - "user": content, - "ai": ai_answer - }) + content_list.append({"user": content, "ai": ai_answer}) # 鏇存柊璁板綍 - existing_conversation.content = json.dumps(content_list, ensure_ascii=False) + existing_conversation.content = json.dumps( + content_list, ensure_ascii=False + ) existing_conversation.is_processed = True - logger.info(f"杩藉姞鍒板綋鍓嶇敤鎴风兢缁勫皬鏃跺璇濊褰�: user={from_user}, group={from_group}, hour={hour_key}, 瀵硅瘽杞={len(content_list)}") + logger.info( + f"杩藉姞鍒板綋鍓嶇敤鎴风兢缁勫皬鏃跺璇濊褰�: user={from_user}, group={from_group}, hour={hour_key}, 瀵硅瘽杞={len(content_list)}" + ) except json.JSONDecodeError as e: logger.error(f"瑙f瀽鐜版湁瀵硅瘽鍐呭JSON澶辫触: {str(e)}, 閲嶆柊鍒涘缓") # 濡傛灉JSON瑙f瀽澶辫触锛岄噸鏂板垱寤篶ontent content_list = [{"user": content, "ai": ai_answer}] - existing_conversation.content = json.dumps(content_list, ensure_ascii=False) + existing_conversation.content = json.dumps( + content_list, ensure_ascii=False + ) existing_conversation.is_processed = True else: # 鍒涘缓鏂拌褰� - 鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽鎴栭娆″璇濓紝浣跨敤JSON鏍煎紡瀛樺偍瀵硅瘽鍐呭 @@ -281,7 +465,9 @@ is_processed=True, ) db.add(new_conversation) - logger.info(f"鍒涘缓鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽璁板綍: user={from_user}, group={from_group}, hour={hour_key}, 鍒濆瀵硅瘽杞=1") + logger.info( + f"鍒涘缓鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽璁板綍: user={from_user}, group={from_group}, hour={hour_key}, 鍒濆瀵硅瘽杞=1" + ) db.commit() @@ -290,7 +476,8 @@ if ai_answer: # 瑙f瀽AI鍥炲涓殑@瀛楃 processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer) - + # 鍒ゆ柇AI鍥炲鏄惁鏄粨鏉熷瓧绗︿覆 + is_end_str = self.is_end_str(ai_answer) # 鍙戦�佹秷鎭紝鏈�澶氶噸璇�3娆� for attempt in range(3): if at_wc_ids: @@ -299,6 +486,19 @@ if ecloud_client.send_group_at_message( w_id, from_group, processed_answer, at_wc_ids ): + # @鍚庤Е鍙戦潤榛樻ā寮� + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"AI鍥炲@瀹㈡湇锛岀兢缁勯潤榛樻ā寮忔椂闂村凡鍒锋柊: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info( + f"AI鍥炲@瀹㈡湇锛岀兢缁勯潤榛樻ā寮忓凡婵�娲�: fromUser={from_user}, fromGroup={from_group}" + ) success = True break else: @@ -310,10 +510,27 @@ success = True break - logger.warning(f"鍙戦�丄I鍥炵瓟澶辫触锛屽皾璇曢噸璇� ({attempt + 1}/3): from_user={from_user}") + logger.warning( + f"鍙戦�丄I鍥炵瓟澶辫触锛屽皾璇曢噸璇� ({attempt + 1}/3): from_user={from_user}" + ) if attempt < 2: # 涓嶆槸鏈�鍚庝竴娆″皾璇曪紝绛夊緟涓�娈垫椂闂村啀閲嶈瘯 - time.sleep(2 ** attempt) # 鎸囨暟閫�閬� - + time.sleep(2**attempt) # 鎸囨暟閫�閬� + + # AI鍥炲缁撴潫瀛楃涓茶Е鍙戦潤榛樻ā寮� + if is_end_str: + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"AI鍥炲缁撴潫瀛楃涓诧紝缇ょ粍闈欓粯妯″紡鏃堕棿宸插埛鏂�: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info( + f"AI鍥炲缁撴潫瀛楃涓诧紝缇ょ粍闈欓粯妯″紡宸叉縺娲�: fromUser={from_user}, fromGroup={from_group}" + ) + if success: # 鏇存柊鍙戦�佺姸鎬� conversation = ( @@ -322,7 +539,7 @@ Conversation.from_user == from_user, Conversation.conversation_id == new_conversation_id, Conversation.group == from_group, - Conversation.hour == hour_key + Conversation.hour == hour_key, ) .first() ) -- Gitblit v1.9.1