From f28ac0166536a2a4b68cac685a41ea667f60f7e9 Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期三, 03 九月 2025 14:43:03 +0800 Subject: [PATCH] 兼容企业微信 --- app/services/message_processor.py | 410 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 371 insertions(+), 39 deletions(-) diff --git a/app/services/message_processor.py b/app/services/message_processor.py index 11308fc..8a79a72 100644 --- a/app/services/message_processor.py +++ b/app/services/message_processor.py @@ -3,7 +3,11 @@ """ import json -from typing import Dict, Any, Optional +import time +import re +import xml.etree.ElementTree as ET +from typing import Dict, Any, Optional, List, Tuple +from datetime import datetime from sqlalchemy.orm import Session from loguru import logger @@ -13,6 +17,10 @@ from app.services.redis_queue import redis_queue from app.services.ecloud_client import ecloud_client from app.services.dify_client import dify_client +from app.services.friend_ignore_service import friend_ignore_service +from app.services.silence_service import silence_service +from app.services.group_stats_service import group_stats_service +from config import settings class MessageProcessor: @@ -20,6 +28,149 @@ def __init__(self): pass + + def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: + """ + 鎻愬彇寮曠敤娑堟伅鐨勫唴瀹癸紙message_type涓�80014锛� + + Args: + callback_data: 鍥炶皟鏁版嵁 + + Returns: + 缁勫悎鍚庣殑娑堟伅鍐呭锛坈ontent + title锛屼腑闂寸┖涓�琛岋級 + """ + try: + data = callback_data.get("data", {}) + content = data.get("content", "") + title = data.get("title", "") + + # 瑙f瀽XML鍐呭锛屾彁鍙杕sg>appmsg>refermsg>content鏍囩涓殑鍐呭 + refer_content = "" + xml_title = "" + try: + # 瑙f瀽XML + root = ET.fromstring(content) + + # 鏌ユ壘msg>appmsg>refermsg>content璺緞 + appmsg = root.find("appmsg") + if appmsg is not None: + # 鎻愬彇XML涓殑title锛堝鏋滃瓨鍦級 + title_element = appmsg.find("title") + if title_element is not None and title_element.text: + xml_title = title_element.text.strip() + + # 鎻愬彇寮曠敤娑堟伅鍐呭 + refermsg = appmsg.find("refermsg") + if refermsg is not None: + content_element = refermsg.find("content") + if content_element is not None and content_element.text: + refer_content = content_element.text.strip() + + except ET.ParseError as e: + logger.warning(f"XML瑙f瀽澶辫触: {str(e)}, content={content}") + # 濡傛灉XML瑙f瀽澶辫触锛屼娇鐢ㄥ師濮媍ontent + refer_content = content + + # 纭畾鏈�缁堜娇鐢ㄧ殑title锛氫紭鍏堜娇鐢╔ML涓殑title锛屽叾娆′娇鐢╠ata.title + final_title = xml_title if xml_title else title + + # 缁勫悎鍐呭锛歳efer_content鍦ㄥ墠锛宖inal_title鍦ㄥ悗锛屼腑闂寸┖涓�琛� + if refer_content and final_title: + combined_content = f"{refer_content}\n\n{final_title}" + elif refer_content: + combined_content = refer_content + elif final_title: + combined_content = final_title + else: + combined_content = content # 濡傛灉閮芥病鏈夛紝浣跨敤鍘熷content + + logger.info( + f"寮曠敤娑堟伅鍐呭鎻愬彇瀹屾垚: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}" + ) + return combined_content + + except Exception as e: + logger.error(f"鎻愬彇寮曠敤娑堟伅鍐呭寮傚父: error={str(e)}") + # 寮傚父鎯呭喌涓嬭繑鍥炲師濮媍ontent + return callback_data.get("data", {}).get("content", "") + + def parse_at_mentions(self, ai_answer: str, from_user: str) -> Tuple[str, List[str]]: + """ + 瑙f瀽AI鍥炲涓殑@瀛楃锛屾彁鍙栧鏈嶅悕绉� + + Args: + ai_answer: AI鍥炲鍐呭 + + Returns: + (澶勭悊鍚庣殑娑堟伅鍐呭, 闇�瑕丂鐨勫鏈峸cid鍒楄〃) + """ + try: + # 鑾峰彇閰嶇疆鐨勫鏈嶅悕绉板垪琛� + customer_service_names = settings.customer_service_names + + # 鏌ユ壘鎵�鏈堾瀛楃鍚庣殑瀹㈡湇鍚嶇О + at_pattern = r"@([^\s]+)" + matches = re.findall(at_pattern, ai_answer) + + valid_at_names = [] + at_wc_ids = [] + + for match in matches: + # 妫�鏌ユ槸鍚﹀湪閰嶇疆鐨勫鏈嶅悕绉板垪琛ㄤ腑 + if match in customer_service_names: + valid_at_names.append(match) + logger.info(f"鍙戠幇鏈夋晥鐨凘瀹㈡湇鍚嶇О: {match}") + + # 濡傛灉鏈夋湁鏁堢殑@瀹㈡湇鍚嶇О锛屾煡璇㈡暟鎹簱鑾峰彇wcid + if valid_at_names: + with next(get_db()) as db: + for name in valid_at_names: + # 鏍规嵁nick_name鏌ユ壘鑱旂郴浜� + contact = ( + db.query(Contact).filter(Contact.nick_name == name).first() + ) + if contact: + # 濡傛灉from_user鍖呭惈@openim琛ㄧず鏄紒涓氬井淇★紝浣跨敤work_wc_id + if "@openim" in from_user: + at_wc_ids.append(contact.work_wc_id) + logger.info( + f"鎵惧埌瀹㈡湇鑱旂郴浜�-寰俊: name={name}, wc_id={contact.work_wc_id}" + ) + else: + at_wc_ids.append(contact.wc_id) + logger.info( + f"鎵惧埌瀹㈡湇鑱旂郴浜�-浼佷笟寰俊: name={name}, wc_id={contact.wc_id}" + ) + else: + logger.warning(f"鏈壘鍒板鏈嶈仈绯讳汉: name={name}") + + return ai_answer, at_wc_ids + + except Exception as e: + logger.error(f"瑙f瀽@瀛楃寮傚父: error={str(e)}") + return ai_answer, [] + + def is_end_str(self, ai_answer: str) -> bool: + """ + 瑙f瀽AI鍥炲鍒ゆ柇鏄惁鏄粨鏉熷瓧绗︿覆 + + Args: + ai_answer: AI鍥炲鍐呭 + + Returns: + """ + try: + # 鑾峰彇閰嶇疆鐨勭粨鏉熷瓧绗︿覆鍒楄〃 + end_str_list = settings.end_str_list + for end_str in end_str_list: + substrings = end_str.split(',') + # 妫�鏌� match_str 鏄惁鍖呭惈姣忎竴涓瓙瀛楃涓� + if all(sub in ai_answer for sub in substrings): + return True + return False + except Exception as e: + logger.error(f"瑙f瀽缁撴潫瀛楃涓插紓甯�: error={str(e)}") + return False def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: """ @@ -31,9 +182,10 @@ Returns: 鏄惁鏄湁鏁堢殑缇よ亰娑堟伅 """ - # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛� + # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛夋垨寮曠敤娑堟伅锛�80014锛� message_type = callback_data.get("messageType") - if message_type != "80001": + print(f"data: {callback_data}") + if message_type not in ["80001", "80014"]: logger.info(f"蹇界暐闈炵兢鑱婃秷鎭�: messageType={message_type}") return False @@ -52,6 +204,42 @@ logger.warning(f"娑堟伅缂哄皯蹇呰瀛楁: data={data}") return False + # 鑾峰彇鐢ㄦ埛鍜岀兢缁勪俊鎭� + from_user = data.get("fromUser") + from_group = data.get("fromGroup") + + # 妫�鏌ュ彂閫佽�呮槸鍚﹀湪濂藉弸蹇界暐鍒楄〃涓紙浼犲叆缇ょ粍ID鐢ㄤ簬娴嬭瘯缇ょ粍妫�鏌ワ級 + is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user, from_group) + + if is_friend_ignored: + logger.info( + f"蹇界暐濂藉弸鍙戦�佺殑娑堟伅: fromUser={from_user}, fromGroup={from_group}" + ) + # 缁熻琚拷鐣ョ殑濂藉弸鍙戣█娆℃暟锛堢‘淇濊蹇界暐鐨勫ソ鍙嬫秷鎭篃绾冲叆缁熻锛� + group_stats_service.increment_user_message_count(from_group, from_user) + # 婵�娲绘垨寤堕暱璇ョ兢缁勭殑闈欓粯妯″紡 + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡鏃堕棿宸插埛鏂�: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info( + f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡宸叉縺娲�: fromUser={from_user}, fromGroup={from_group}" + ) + return False + + # # 缁熻姝e父澶勭悊鐨勫ソ鍙嬪彂瑷�娆℃暟 + # group_stats_service.increment_user_message_count(from_group, from_user) + + # 妫�鏌ヨ缇ょ粍鐨勯潤榛樻ā寮忔槸鍚︽縺娲伙紙鍦ㄥソ鍙嬪拷鐣ユ鏌ヤ箣鍚庯級 + if silence_service.is_silence_active(from_group): + logger.info(f"缇ょ粍闈欓粯妯″紡婵�娲讳腑锛屽拷鐣ユ秷鎭�: fromGroup={from_group}") + return False + return True def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: @@ -68,11 +256,33 @@ if not self.is_valid_group_message(callback_data): return False - data = callback_data.get("data", {}) - from_user = data.get("fromUser") + # 瀵煎叆娑堟伅鑱氬悎鏈嶅姟锛堝欢杩熷鍏ラ伩鍏嶅惊鐜緷璧栵級 + from app.services.message_aggregator import message_aggregator - # 灏嗘暣涓洖璋冨唴瀹硅浆鎴怞SON瀛楃涓插瓨鍌ㄥ埌Redis闃熷垪 - return redis_queue.enqueue_message(from_user, callback_data) + # 灏濊瘯灏嗘秷鎭坊鍔犲埌鑱氬悎涓� + should_process_immediately, aggregated_data = ( + message_aggregator.add_message_to_aggregation(callback_data) + ) + + if should_process_immediately: + # 闇�瑕佺珛鍗冲鐞嗭紙涓嶈仛鍚堟垨鑱氬悎瓒呮椂锛� + data = ( + aggregated_data.get("data", {}) + if aggregated_data + else callback_data.get("data", {}) + ) + from_user = data.get("fromUser") + + # 灏嗘秷鎭姞鍏edis闃熷垪 + return redis_queue.enqueue_message( + from_user, aggregated_data or callback_data + ) + else: + # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞� + logger.info( + f"娑堟伅宸叉坊鍔犲埌鑱氬悎闃熷垪锛岀瓑寰呰仛鍚堝鐞�: fromUser={callback_data.get('data', {}).get('fromUser')}" + ) + return True def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: """ @@ -148,25 +358,50 @@ from_group = data.get("fromGroup") content = data.get("content") w_id = data.get("wId") + message_type = message_data.get("messageType") - logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}") + logger.info( + f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}, message_type={message_type}" + ) - # 鑾峰彇鏁版嵁搴撲細璇� - db = next(get_db()) + # 鏍规嵁娑堟伅绫诲瀷澶勭悊鍐呭 + if message_type == "80014": + # 寮曠敤娑堟伅锛岄渶瑕佹彁鍙朮ML涓殑鍐呭骞朵笌title缁勫悎 + content = self.extract_refer_message_content(message_data) + logger.info(f"寮曠敤娑堟伅鍐呭澶勭悊瀹屾垚: content_length={len(content)}") - try: + # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞� + with next(get_db()) as db: # 3.1 纭繚鑱旂郴浜轰俊鎭瓨鍦� if not self.ensure_contact_exists(from_group, w_id, db): logger.error(f"鑱旂郴浜轰俊鎭鐞嗗け璐�: from_group={from_group}") return False - # 3.2 鑾峰彇鐢ㄦ埛鐨刢onversation_id - conversation_id = redis_queue.get_conversation_id(from_user) + # 3.2 鑾峰彇缇ょ粍涓彂瑷�娆℃暟鏈�澶氱殑鐢ㄦ埛鏄电О + # most_active_nickname = ( + # group_stats_service.get_most_active_user_nickname(from_group) + # ) + # logger.info( + # f"缇ょ粍鏈�娲昏穬鐢ㄦ埛鏄电О: group={from_group}, nickname={most_active_nickname}" + # ) + # 鑾峰彇榛樿瀹㈡湇鍚嶇О + nickname = settings.customer_service_default_name - # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭� - dify_response = dify_client.send_chat_message( - query=content, user=from_user, conversation_id=conversation_id + # 3.3 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id + conversation_id = redis_queue.get_conversation_id(from_user, from_group) + + # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛� + dify_response = dify_client.send_message( + query=content, + user=from_user, + conversation_id=conversation_id, + nick_name=nickname, ) + + if silence_service.is_silence_active(from_group): + # 鍥炲鍓嶅垽鏂槸鍚︽縺娲婚潤榛橈紝宸查潤榛樺垯涓嶅洖澶� + logger.error(f"Dify宸插搷搴斾絾缇ょ粍宸查潤榛橈細from_user={from_user}") + return False if not dify_response: logger.error(f"Dify鍝嶅簲澶辫触: from_user={from_user}") @@ -176,55 +411,155 @@ ai_answer = dify_response.get("answer", "") new_conversation_id = dify_response.get("conversation_id", "") - # 鏇存柊Redis涓殑conversation_id + # 鏇存柊Redis涓殑conversation_id锛堝熀浜庣敤鎴�+缇ょ粍锛� if new_conversation_id: - redis_queue.set_conversation_id(from_user, new_conversation_id) + redis_queue.set_conversation_id( + from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60 + ) - # 3.3 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 - user_conversation_key = f"{from_user}_{new_conversation_id}" + # 3.4 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 + # 鎸夌敤鎴枫�佺兢缁勫拰灏忔椂鍒嗙粍瀵硅瘽璁板綍 + current_time = datetime.now() + hour_key = current_time.strftime("%Y%m%d_%H") - # 妫�鏌ユ槸鍚﹀凡瀛樺湪璁板綍 + # 鏌ユ壘褰撳墠鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勫綋鍓嶅皬鏃剁殑瀵硅瘽璁板綍 existing_conversation = ( db.query(Conversation) - .filter(Conversation.user_conversation_key == user_conversation_key) + .filter( + Conversation.from_user == from_user, + Conversation.conversation_id == new_conversation_id, + Conversation.group == from_group, + Conversation.hour == hour_key, + ) .first() ) if existing_conversation: - # 鏇存柊鐜版湁璁板綍 - existing_conversation.user_question = content - existing_conversation.ai_answer = ai_answer - existing_conversation.is_processed = True - logger.info(f"鏇存柊瀵硅瘽璁板綍: key={user_conversation_key}") + # 鏇存柊鐜版湁璁板綍 - 浣跨敤JSON鏍煎紡杩藉姞瀵硅瘽鍐呭锛堝綋鍓嶇敤鎴峰湪褰撳墠缇ょ粍褰撳墠灏忔椂鐨勫璇濓級 + try: + # 瑙f瀽鐜版湁鐨刢ontent JSON + if existing_conversation.content: + content_list = json.loads(existing_conversation.content) + else: + content_list = [] + + # 杩藉姞鏂扮殑瀵硅瘽鍐呭 + content_list.append({"user": content, "ai": ai_answer}) + + # 鏇存柊璁板綍 + existing_conversation.content = json.dumps( + content_list, ensure_ascii=False + ) + existing_conversation.is_processed = True + logger.info( + f"杩藉姞鍒板綋鍓嶇敤鎴风兢缁勫皬鏃跺璇濊褰�: user={from_user}, group={from_group}, hour={hour_key}, 瀵硅瘽杞={len(content_list)}" + ) + except json.JSONDecodeError as e: + logger.error(f"瑙f瀽鐜版湁瀵硅瘽鍐呭JSON澶辫触: {str(e)}, 閲嶆柊鍒涘缓") + # 濡傛灉JSON瑙f瀽澶辫触锛岄噸鏂板垱寤篶ontent + content_list = [{"user": content, "ai": ai_answer}] + existing_conversation.content = json.dumps( + content_list, ensure_ascii=False + ) + existing_conversation.is_processed = True else: - # 鍒涘缓鏂拌褰� + # 鍒涘缓鏂拌褰� - 鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽鎴栭娆″璇濓紝浣跨敤JSON鏍煎紡瀛樺偍瀵硅瘽鍐呭 + content_list = [{"user": content, "ai": ai_answer}] new_conversation = Conversation( - user_conversation_key=user_conversation_key, from_user=from_user, conversation_id=new_conversation_id, - user_question=content, - ai_answer=ai_answer, + group=from_group, + hour=hour_key, + content=json.dumps(content_list, ensure_ascii=False), is_processed=True, ) db.add(new_conversation) - logger.info(f"鍒涘缓瀵硅瘽璁板綍: key={user_conversation_key}") + logger.info( + f"鍒涘缓鏂扮殑鐢ㄦ埛缇ょ粍灏忔椂瀵硅瘽璁板綍: user={from_user}, group={from_group}, hour={hour_key}, 鍒濆瀵硅瘽杞=1" + ) db.commit() # 鍙戦�丄I鍥炵瓟鍒扮兢鑱� - if ai_answer and ecloud_client.send_group_message( - w_id, from_group, ai_answer - ): + success = False + if ai_answer: + # 瑙f瀽AI鍥炲涓殑@瀛楃 + processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer, from_user) + # 鍒ゆ柇AI鍥炲鏄惁鏄粨鏉熷瓧绗︿覆 + is_end_str = self.is_end_str(ai_answer) + # 鍙戦�佹秷鎭紝鏈�澶氶噸璇�3娆� + for attempt in range(3): + if at_wc_ids: + # 濡傛灉鏈堾瀹㈡湇锛屼娇鐢ㄧ兢鑱夽鎺ュ彛 + logger.info(f"浣跨敤缇よ亰@鎺ュ彛鍙戦�佹秷鎭�: at_wc_ids={at_wc_ids}") + if ecloud_client.send_group_at_message( + w_id, from_group, processed_answer, at_wc_ids + ): + # @鍚庤Е鍙戦潤榛樻ā寮� + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"AI鍥炲@瀹㈡湇锛岀兢缁勯潤榛樻ā寮忔椂闂村凡鍒锋柊: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + flag = silence_service.activate_silence_mode(from_group) + if flag: + logger.info( + f"AI鍥炲@瀹㈡湇锛岀兢缁勯潤榛樻ā寮忓凡婵�娲�: fromUser={from_user}, fromGroup={from_group}" + ) + else: + logger.info( + f"AI鍥炲@瀹㈡湇锛屾湭婵�娲婚潤榛樻ā寮�: fromUser={from_user}, fromGroup={from_group}" + ) + success = True + break + else: + # 鏅�氱兢鑱婃秷鎭� + logger.info("浣跨敤鏅�氱兢鑱婃帴鍙e彂閫佹秷鎭�") + if ecloud_client.send_group_message( + w_id, from_group, processed_answer + ): + success = True + break + + logger.warning( + f"鍙戦�丄I鍥炵瓟澶辫触锛屽皾璇曢噸璇� ({attempt + 1}/3): from_user={from_user}" + ) + if attempt < 2: # 涓嶆槸鏈�鍚庝竴娆″皾璇曪紝绛夊緟涓�娈垫椂闂村啀閲嶈瘯 + time.sleep(2**attempt) # 鎸囨暟閫�閬� + + # AI鍥炲缁撴潫瀛楃涓茶Е鍙戦潤榛樻ā寮� + if is_end_str: + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info( + f"AI鍥炲缁撴潫瀛楃涓诧紝缇ょ粍闈欓粯妯″紡鏃堕棿宸插埛鏂�: fromUser={from_user}, fromGroup={from_group}" + ) + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info( + f"AI鍥炲缁撴潫瀛楃涓诧紝缇ょ粍闈欓粯妯″紡宸叉縺娲�: fromUser={from_user}, fromGroup={from_group}" + ) + + if success: # 鏇存柊鍙戦�佺姸鎬� conversation = ( db.query(Conversation) .filter( - Conversation.user_conversation_key == user_conversation_key + Conversation.from_user == from_user, + Conversation.conversation_id == new_conversation_id, + Conversation.group == from_group, + Conversation.hour == hour_key, ) .first() ) if conversation: conversation.is_sent = True + conversation.sent_time = current_time db.commit() logger.info(f"娑堟伅澶勭悊瀹屾垚: from_user={from_user}") @@ -232,9 +567,6 @@ else: logger.error(f"鍙戦�丄I鍥炵瓟澶辫触: from_user={from_user}") return False - - finally: - db.close() except Exception as e: logger.error(f"澶勭悊娑堟伅寮傚父: from_user={from_user}, error={str(e)}") -- Gitblit v1.9.1