From 2e391d599d08ea7a7c11442bc2845a1191494c3d Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期四, 07 八月 2025 17:55:58 +0800 Subject: [PATCH] 新增消息聚会,短时间内同一用户发送的消息聚合才后发送给AI --- app/services/message_processor.py | 150 +++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 137 insertions(+), 13 deletions(-) diff --git a/app/services/message_processor.py b/app/services/message_processor.py index 12aaea2..5ac473c 100644 --- a/app/services/message_processor.py +++ b/app/services/message_processor.py @@ -5,6 +5,7 @@ import json import time import re +import xml.etree.ElementTree as ET from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from sqlalchemy.orm import Session @@ -16,6 +17,9 @@ from app.services.redis_queue import redis_queue from app.services.ecloud_client import ecloud_client from app.services.dify_client import dify_client +from app.services.friend_ignore_service import friend_ignore_service +from app.services.silence_service import silence_service +from app.services.group_stats_service import group_stats_service from config import settings @@ -24,6 +28,69 @@ def __init__(self): pass + + def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: + """ + 鎻愬彇寮曠敤娑堟伅鐨勫唴瀹癸紙message_type涓�80014锛� + + Args: + callback_data: 鍥炶皟鏁版嵁 + + Returns: + 缁勫悎鍚庣殑娑堟伅鍐呭锛坈ontent + title锛屼腑闂寸┖涓�琛岋級 + """ + try: + data = callback_data.get("data", {}) + content = data.get("content", "") + title = data.get("title", "") + + # 瑙f瀽XML鍐呭锛屾彁鍙杕sg>appmsg>refermsg>content鏍囩涓殑鍐呭 + refer_content = "" + xml_title = "" + try: + # 瑙f瀽XML + root = ET.fromstring(content) + + # 鏌ユ壘msg>appmsg>refermsg>content璺緞 + appmsg = root.find("appmsg") + if appmsg is not None: + # 鎻愬彇XML涓殑title锛堝鏋滃瓨鍦級 + title_element = appmsg.find("title") + if title_element is not None and title_element.text: + xml_title = title_element.text.strip() + + # 鎻愬彇寮曠敤娑堟伅鍐呭 + refermsg = appmsg.find("refermsg") + if refermsg is not None: + content_element = refermsg.find("content") + if content_element is not None and content_element.text: + refer_content = content_element.text.strip() + + except ET.ParseError as e: + logger.warning(f"XML瑙f瀽澶辫触: {str(e)}, content={content}") + # 濡傛灉XML瑙f瀽澶辫触锛屼娇鐢ㄥ師濮媍ontent + refer_content = content + + # 纭畾鏈�缁堜娇鐢ㄧ殑title锛氫紭鍏堜娇鐢╔ML涓殑title锛屽叾娆′娇鐢╠ata.title + final_title = xml_title if xml_title else title + + # 缁勫悎鍐呭锛歳efer_content鍦ㄥ墠锛宖inal_title鍦ㄥ悗锛屼腑闂寸┖涓�琛� + if refer_content and final_title: + combined_content = f"{refer_content}\n\n{final_title}" + elif refer_content: + combined_content = refer_content + elif final_title: + combined_content = final_title + else: + combined_content = content # 濡傛灉閮芥病鏈夛紝浣跨敤鍘熷content + + logger.info(f"寮曠敤娑堟伅鍐呭鎻愬彇瀹屾垚: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") + return combined_content + + except Exception as e: + logger.error(f"鎻愬彇寮曠敤娑堟伅鍐呭寮傚父: error={str(e)}") + # 寮傚父鎯呭喌涓嬭繑鍥炲師濮媍ontent + return callback_data.get("data", {}).get("content", "") def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: """ @@ -80,9 +147,10 @@ Returns: 鏄惁鏄湁鏁堢殑缇よ亰娑堟伅 """ - # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛� + # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛夋垨寮曠敤娑堟伅锛�80014锛� message_type = callback_data.get("messageType") - if message_type != "80001": + print(f"data: {callback_data}") + if message_type not in ["80001", "80014"]: logger.info(f"蹇界暐闈炵兢鑱婃秷鎭�: messageType={message_type}") return False @@ -101,6 +169,36 @@ logger.warning(f"娑堟伅缂哄皯蹇呰瀛楁: data={data}") return False + # 鑾峰彇鐢ㄦ埛鍜岀兢缁勪俊鎭� + from_user = data.get("fromUser") + from_group = data.get("fromGroup") + + # 妫�鏌ュ彂閫佽�呮槸鍚﹀湪濂藉弸蹇界暐鍒楄〃涓� + is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user) + + if is_friend_ignored: + logger.info(f"蹇界暐濂藉弸鍙戦�佺殑娑堟伅: fromUser={from_user}, fromGroup={from_group}") + # 缁熻琚拷鐣ョ殑濂藉弸鍙戣█娆℃暟锛堢‘淇濊蹇界暐鐨勫ソ鍙嬫秷鎭篃绾冲叆缁熻锛� + group_stats_service.increment_user_message_count(from_group, from_user) + # 婵�娲绘垨寤堕暱璇ョ兢缁勭殑闈欓粯妯″紡 + if silence_service.is_silence_active(from_group): + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忓凡婵�娲伙紝寤堕暱鏃堕棿 + silence_service.extend_silence_mode(from_group) + logger.info(f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡鏃堕棿宸插埛鏂�: fromUser={from_user}, fromGroup={from_group}") + else: + # 濡傛灉璇ョ兢缁勯潤榛樻ā寮忔湭婵�娲伙紝婵�娲婚潤榛樻ā寮� + silence_service.activate_silence_mode(from_group) + logger.info(f"濂藉弸娑堟伅琚拷鐣ワ紝缇ょ粍闈欓粯妯″紡宸叉縺娲�: fromUser={from_user}, fromGroup={from_group}") + return False + + # # 缁熻姝e父澶勭悊鐨勫ソ鍙嬪彂瑷�娆℃暟 + # group_stats_service.increment_user_message_count(from_group, from_user) + + # 妫�鏌ヨ缇ょ粍鐨勯潤榛樻ā寮忔槸鍚︽縺娲伙紙鍦ㄥソ鍙嬪拷鐣ユ鏌ヤ箣鍚庯級 + if silence_service.is_silence_active(from_group): + logger.info(f"缇ょ粍闈欓粯妯″紡婵�娲讳腑锛屽拷鐣ユ秷鎭�: fromGroup={from_group}") + return False + return True def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: @@ -117,11 +215,23 @@ if not self.is_valid_group_message(callback_data): return False - data = callback_data.get("data", {}) - from_user = data.get("fromUser") + # 瀵煎叆娑堟伅鑱氬悎鏈嶅姟锛堝欢杩熷鍏ラ伩鍏嶅惊鐜緷璧栵級 + from app.services.message_aggregator import message_aggregator - # 灏嗘暣涓洖璋冨唴瀹硅浆鎴怞SON瀛楃涓插瓨鍌ㄥ埌Redis闃熷垪 - return redis_queue.enqueue_message(from_user, callback_data) + # 灏濊瘯灏嗘秷鎭坊鍔犲埌鑱氬悎涓� + should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) + + if should_process_immediately: + # 闇�瑕佺珛鍗冲鐞嗭紙涓嶈仛鍚堟垨鑱氬悎瓒呮椂锛� + data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) + from_user = data.get("fromUser") + + # 灏嗘秷鎭姞鍏edis闃熷垪 + return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) + else: + # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞� + logger.info(f"娑堟伅宸叉坊鍔犲埌鑱氬悎闃熷垪锛岀瓑寰呰仛鍚堝鐞�: fromUser={callback_data.get('data', {}).get('fromUser')}") + return True def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: """ @@ -197,8 +307,15 @@ from_group = data.get("fromGroup") content = data.get("content") w_id = data.get("wId") + message_type = message_data.get("messageType") - logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}") + logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}, message_type={message_type}") + + # 鏍规嵁娑堟伅绫诲瀷澶勭悊鍐呭 + if message_type == "80014": + # 寮曠敤娑堟伅锛岄渶瑕佹彁鍙朮ML涓殑鍐呭骞朵笌title缁勫悎 + content = self.extract_refer_message_content(message_data) + logger.info(f"寮曠敤娑堟伅鍐呭澶勭悊瀹屾垚: content_length={len(content)}") # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞� with next(get_db()) as db: @@ -207,12 +324,19 @@ logger.error(f"鑱旂郴浜轰俊鎭鐞嗗け璐�: from_group={from_group}") return False - # 3.2 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id + # 3.2 鑾峰彇缇ょ粍涓彂瑷�娆℃暟鏈�澶氱殑鐢ㄦ埛鏄电О + most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group) + logger.info(f"缇ょ粍鏈�娲昏穬鐢ㄦ埛鏄电О: group={from_group}, nickname={most_active_nickname}") + + # 3.3 鑾峰彇鐢ㄦ埛鍦ㄥ綋鍓嶇兢缁勭殑conversation_id conversation_id = redis_queue.get_conversation_id(from_user, from_group) - # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭� - dify_response = dify_client.send_chat_message( - query=content, user=from_user, conversation_id=conversation_id + # 璋冪敤Dify鎺ュ彛鍙戦�佹秷鎭紙鏍规嵁閰嶇疆閫夋嫨妯″紡锛� + dify_response = dify_client.send_message( + query=content, + user=from_user, + conversation_id=conversation_id, + nick_name=most_active_nickname ) if not dify_response: @@ -225,9 +349,9 @@ # 鏇存柊Redis涓殑conversation_id锛堝熀浜庣敤鎴�+缇ょ粍锛� if new_conversation_id: - redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) + redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800) - # 3.3 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 + # 3.4 淇濆瓨瀵硅瘽璁板綍鍒版暟鎹簱 # 鎸夌敤鎴枫�佺兢缁勫拰灏忔椂鍒嗙粍瀵硅瘽璁板綍 current_time = datetime.now() hour_key = current_time.strftime("%Y%m%d_%H") -- Gitblit v1.9.1