From 2e391d599d08ea7a7c11442bc2845a1191494c3d Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期四, 07 八月 2025 17:55:58 +0800 Subject: [PATCH] 新增消息聚会,短时间内同一用户发送的消息聚合才后发送给AI --- app/api/message_aggregation.py | 139 ++++++ app/services/ecloud_client.py | 3 startup.py | 128 ++++++ config.py | 5 main.py | 10 tests/test_message_aggregator.py | 405 ++++++++++++++++++++ app/services/message_aggregator.py | 328 ++++++++++++++++ ecloud_dify.spec | 1 app/services/message_processor.py | 98 ++++ logs/app.log | 24 10 files changed, 1,113 insertions(+), 28 deletions(-) diff --git a/app/api/message_aggregation.py b/app/api/message_aggregation.py new file mode 100644 index 0000000..817fdde --- /dev/null +++ b/app/api/message_aggregation.py @@ -0,0 +1,139 @@ +""" +娑堟伅鑱氬悎绠$悊鎺ュ彛 +""" + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from typing import Dict, Any, Optional +from loguru import logger + +from app.services.message_aggregator import message_aggregator + + +router = APIRouter() + + +class AggregationStatusResponse(BaseModel): + """鑱氬悎鐘舵�佸搷搴旀ā鍨�""" + + success: bool + message: str + data: Dict[str, Any] + + +class ForceProcessRequest(BaseModel): + """寮哄埗澶勭悊璇锋眰妯″瀷""" + + from_user: str + from_group: str + + +class ForceProcessResponse(BaseModel): + """寮哄埗澶勭悊鍝嶅簲妯″瀷""" + + success: bool + message: str + processed: bool + + +@router.get("/status", response_model=AggregationStatusResponse) +async def get_aggregation_status(): + """ + 鑾峰彇娑堟伅鑱氬悎鐘舵�� + + Returns: + 鑱氬悎鐘舵�佷俊鎭� + """ + try: + status = message_aggregator.get_aggregation_status() + + return AggregationStatusResponse( + success=True, message="鑾峰彇鑱氬悎鐘舵�佹垚鍔�", data=status + ) + + except Exception as e: + logger.error(f"鑾峰彇鑱氬悎鐘舵�佸紓甯�: {str(e)}") + raise HTTPException(status_code=500, detail=f"鑾峰彇鑱氬悎鐘舵�佸け璐�: {str(e)}") + + +@router.post("/force-process", response_model=ForceProcessResponse) +async def force_process_aggregation(request: ForceProcessRequest): + """ + 寮哄埗澶勭悊鎸囧畾鐢ㄦ埛鍜岀兢缁勭殑鑱氬悎娑堟伅 + + Args: + request: 寮哄埗澶勭悊璇锋眰 + + Returns: + 澶勭悊缁撴灉 + """ + try: + logger.info( + f"寮哄埗澶勭悊鑱氬悎娑堟伅璇锋眰: from_user={request.from_user}, from_group={request.from_group}" + ) + + processed = message_aggregator.force_process_aggregation( + request.from_user, request.from_group + ) + + if processed: + return ForceProcessResponse( + success=True, message="鑱氬悎娑堟伅澶勭悊鎴愬姛", processed=True + ) + else: + return ForceProcessResponse( + success=True, message="娌℃湁鎵惧埌寰呰仛鍚堢殑娑堟伅", processed=False + ) + + except Exception as e: + logger.error( + f"寮哄埗澶勭悊鑱氬悎娑堟伅寮傚父: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}" + ) + raise HTTPException(status_code=500, detail=f"寮哄埗澶勭悊鑱氬悎娑堟伅澶辫触: {str(e)}") + + +@router.get("/config") +async def get_aggregation_config(): + """ + 鑾峰彇娑堟伅鑱氬悎閰嶇疆 + + Returns: + 鑱氬悎閰嶇疆淇℃伅 + """ + try: + config = { + "enabled": message_aggregator.aggregation_enabled, + "timeout_seconds": message_aggregator.aggregation_timeout, + } + + return {"success": True, "message": "鑾峰彇鑱氬悎閰嶇疆鎴愬姛", "data": config} + + except Exception as e: + logger.error(f"鑾峰彇鑱氬悎閰嶇疆寮傚父: {str(e)}") + raise HTTPException(status_code=500, detail=f"鑾峰彇鑱氬悎閰嶇疆澶辫触: {str(e)}") + + +@router.get("/health") +async def aggregation_health_check(): + """ + 娑堟伅鑱氬悎鏈嶅姟鍋ュ悍妫�鏌� + + Returns: + 鍋ュ悍鐘舵�� + """ + try: + status = message_aggregator.get_aggregation_status() + + return { + "success": True, + "message": "娑堟伅鑱氬悎鏈嶅姟杩愯姝e父", + "data": { + "service_status": "healthy", + "enabled": status["enabled"], + "active_aggregations": status["active_aggregations"], + }, + } + + except Exception as e: + logger.error(f"鑱氬悎鏈嶅姟鍋ュ悍妫�鏌ュ紓甯�: {str(e)}") + raise HTTPException(status_code=500, detail=f"鑱氬悎鏈嶅姟鍋ュ悍妫�鏌ュけ璐�: {str(e)}") diff --git a/app/services/ecloud_client.py b/app/services/ecloud_client.py index b8a64ef..0a26cac 100644 --- a/app/services/ecloud_client.py +++ b/app/services/ecloud_client.py @@ -38,7 +38,8 @@ if keyword in filtered_content: filtered_content = filtered_content.replace(keyword, "") logger.info(f"杩囨护鍏抽敭璇�: {keyword}") - + # 鍘婚櫎鍓嶅悗绌烘牸 + filtered_content = filtered_content.strip() return filtered_content def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]: diff --git a/app/services/message_aggregator.py b/app/services/message_aggregator.py new file mode 100644 index 0000000..9dbe3e0 --- /dev/null +++ b/app/services/message_aggregator.py @@ -0,0 +1,328 @@ +""" +娑堟伅鑱氬悎鏈嶅姟 +瀹炵幇鍚屼竴缇ょ粍涓悓涓�鐢ㄦ埛鐭椂闂村唴澶氭潯娑堟伅鐨勮仛鍚堝姛鑳� +""" + +import time +import threading +import json +from typing import Dict, List, Optional, Any, Tuple +from loguru import logger +from dataclasses import dataclass, field +from datetime import datetime, timedelta + +from config import settings + + +@dataclass +class PendingMessage: + """寰呰仛鍚堢殑娑堟伅""" + message_data: Dict[str, Any] + timestamp: float + content: str + from_user: str + from_group: str + + +@dataclass +class MessageAggregation: + """娑堟伅鑱氬悎瀵硅薄""" + from_user: str + from_group: str + messages: List[PendingMessage] = field(default_factory=list) + first_message_time: float = 0.0 + last_message_time: float = 0.0 + timer: Optional[threading.Timer] = None + lock: threading.Lock = field(default_factory=threading.Lock) + + def add_message(self, message: PendingMessage): + """娣诲姞娑堟伅鍒拌仛鍚堜腑""" + with self.lock: + self.messages.append(message) + if not self.first_message_time: + self.first_message_time = message.timestamp + self.last_message_time = message.timestamp + + def get_aggregated_content(self) -> str: + """鑾峰彇鑱氬悎鍚庣殑娑堟伅鍐呭""" + with self.lock: + if not self.messages: + return "" + + # 鎸夋椂闂存埑鎺掑簭娑堟伅 + sorted_messages = sorted(self.messages, key=lambda x: x.timestamp) + + # 鍚堝苟娑堟伅鍐呭 + contents = [msg.content for msg in sorted_messages] + return "\n".join(contents) + + def get_latest_message_data(self) -> Optional[Dict[str, Any]]: + """鑾峰彇鏈�鏂扮殑娑堟伅鏁版嵁浣滀负妯℃澘""" + with self.lock: + if not self.messages: + return None + + # 杩斿洖鏈�鍚庝竴鏉℃秷鎭殑鏁版嵁浣滀负妯℃澘 + latest_message = max(self.messages, key=lambda x: x.timestamp) + return latest_message.message_data.copy() + + def clear(self): + """娓呯┖鑱氬悎鏁版嵁""" + with self.lock: + self.messages.clear() + self.first_message_time = 0.0 + self.last_message_time = 0.0 + if self.timer: + self.timer.cancel() + self.timer = None + + +class MessageAggregatorService: + """娑堟伅鑱氬悎鏈嶅姟""" + + def __init__(self): + self.aggregations: Dict[str, MessageAggregation] = {} + self.lock = threading.Lock() + self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15) # 榛樿15绉� + self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True) # 榛樿鍚敤 + + logger.info(f"娑堟伅鑱氬悎鏈嶅姟鍒濆鍖�: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}") + + def _get_aggregation_key(self, from_user: str, from_group: str) -> str: + """鑾峰彇鑱氬悎閿�""" + return f"{from_group}:{from_user}" + + def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool: + """ + 鍒ゆ柇娑堟伅鏄惁搴旇琚仛鍚� + + Args: + message_data: 娑堟伅鏁版嵁 + + Returns: + 鏄惁搴旇鑱氬悎 + """ + if not self.aggregation_enabled: + return False + + # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅 + message_type = message_data.get("messageType") + if message_type != "80001": + return False + + data = message_data.get("data", {}) + + # 妫�鏌ユ槸鍚︽槸鑷繁鍙戦�佺殑娑堟伅 + if data.get("self", False): + return False + + # 妫�鏌ュ繀瑕佸瓧娈� + if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"): + return False + + return True + + def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + 灏嗘秷鎭坊鍔犲埌鑱氬悎涓� + + Args: + message_data: 娑堟伅鏁版嵁 + + Returns: + (鏄惁闇�瑕佺珛鍗冲鐞�, 鑱氬悎鍚庣殑娑堟伅鏁版嵁) + - 濡傛灉杩斿洖 (False, None)锛岃〃绀烘秷鎭凡琚仛鍚堬紝涓嶉渶瑕佺珛鍗冲鐞� + - 濡傛灉杩斿洖 (True, aggregated_data)锛岃〃绀鸿仛鍚堣秴鏃讹紝闇�瑕佸鐞嗚仛鍚堝悗鐨勬秷鎭� + """ + if not self.should_aggregate_message(message_data): + # 涓嶉渶瑕佽仛鍚堬紝鐩存帴澶勭悊 + return True, message_data + + data = message_data.get("data", {}) + from_user = data.get("fromUser") + from_group = data.get("fromGroup") + content = data.get("content", "") + + aggregation_key = self._get_aggregation_key(from_user, from_group) + current_time = time.time() + + pending_message = PendingMessage( + message_data=message_data, + timestamp=current_time, + content=content, + from_user=from_user, + from_group=from_group + ) + + with self.lock: + if aggregation_key not in self.aggregations: + # 鍒涘缓鏂扮殑鑱氬悎 + self.aggregations[aggregation_key] = MessageAggregation( + from_user=from_user, + from_group=from_group + ) + + aggregation = self.aggregations[aggregation_key] + + # 鍙栨秷涔嬪墠鐨勫畾鏃跺櫒 + with aggregation.lock: + if aggregation.timer: + aggregation.timer.cancel() + + # 娣诲姞娑堟伅鍒拌仛鍚� + aggregation.add_message(pending_message) + + # 璁剧疆鏂扮殑瀹氭椂鍣� + timer = threading.Timer( + self.aggregation_timeout, + self._process_aggregated_messages, + args=(aggregation_key,) + ) + + with aggregation.lock: + aggregation.timer = timer + + timer.start() + + logger.info(f"娑堟伅宸叉坊鍔犲埌鑱氬悎: user={from_user}, group={from_group}, " + f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s") + + # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞� + return False, None + + def _process_aggregated_messages(self, aggregation_key: str): + """ + 澶勭悊鑱氬悎鐨勬秷鎭紙瀹氭椂鍣ㄥ洖璋冿級 + + Args: + aggregation_key: 鑱氬悎閿� + """ + try: + with self.lock: + if aggregation_key not in self.aggregations: + logger.warning(f"鑱氬悎閿笉瀛樺湪: {aggregation_key}") + return + + aggregation = self.aggregations[aggregation_key] + + # 鑾峰彇鑱氬悎鍚庣殑娑堟伅鏁版嵁 + aggregated_content = aggregation.get_aggregated_content() + latest_message_data = aggregation.get_latest_message_data() + + if not aggregated_content or not latest_message_data: + logger.warning(f"鑱氬悎鏁版嵁涓虹┖: {aggregation_key}") + self._cleanup_aggregation(aggregation_key) + return + + # 鍒涘缓鑱氬悎鍚庣殑娑堟伅鏁版嵁 + aggregated_message_data = latest_message_data.copy() + aggregated_message_data["data"]["content"] = aggregated_content + + message_count = len(aggregation.messages) + logger.info(f"澶勭悊鑱氬悎娑堟伅: user={aggregation.from_user}, group={aggregation.from_group}, " + f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}") + + # 娓呯悊鑱氬悎鏁版嵁 + self._cleanup_aggregation(aggregation_key) + + # 璋冪敤娑堟伅澶勭悊鍣ㄥ鐞嗚仛鍚堝悗鐨勬秷鎭紙寤惰繜瀵煎叆閬垮厤寰幆渚濊禆锛� + from app.services.message_processor import message_processor + success = message_processor.process_single_message(aggregated_message_data) + + if success: + logger.info(f"鑱氬悎娑堟伅澶勭悊鎴愬姛: user={aggregation.from_user}, group={aggregation.from_group}") + else: + logger.error(f"鑱氬悎娑堟伅澶勭悊澶辫触: user={aggregation.from_user}, group={aggregation.from_group}") + + except Exception as e: + logger.error(f"澶勭悊鑱氬悎娑堟伅寮傚父: aggregation_key={aggregation_key}, error={str(e)}") + self._cleanup_aggregation(aggregation_key) + + def _cleanup_aggregation(self, aggregation_key: str): + """ + 娓呯悊鑱氬悎鏁版嵁 + + Args: + aggregation_key: 鑱氬悎閿� + """ + try: + with self.lock: + if aggregation_key in self.aggregations: + aggregation = self.aggregations[aggregation_key] + aggregation.clear() + del self.aggregations[aggregation_key] + logger.debug(f"鑱氬悎鏁版嵁宸叉竻鐞�: {aggregation_key}") + except Exception as e: + logger.error(f"娓呯悊鑱氬悎鏁版嵁寮傚父: aggregation_key={aggregation_key}, error={str(e)}") + + def get_aggregation_status(self) -> Dict[str, Any]: + """ + 鑾峰彇鑱氬悎鐘舵�佷俊鎭� + + Returns: + 鑱氬悎鐘舵�佷俊鎭� + """ + with self.lock: + status = { + "enabled": self.aggregation_enabled, + "timeout": self.aggregation_timeout, + "active_aggregations": len(self.aggregations), + "aggregations": {} + } + + for key, aggregation in self.aggregations.items(): + with aggregation.lock: + status["aggregations"][key] = { + "from_user": aggregation.from_user, + "from_group": aggregation.from_group, + "message_count": len(aggregation.messages), + "first_message_time": aggregation.first_message_time, + "last_message_time": aggregation.last_message_time, + "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0 + } + + return status + + def force_process_aggregation(self, from_user: str, from_group: str) -> bool: + """ + 寮哄埗澶勭悊鎸囧畾鐢ㄦ埛鍜岀兢缁勭殑鑱氬悎娑堟伅 + + Args: + from_user: 鐢ㄦ埛ID + from_group: 缇ょ粍ID + + Returns: + 鏄惁鎴愬姛澶勭悊 + """ + aggregation_key = self._get_aggregation_key(from_user, from_group) + + with self.lock: + if aggregation_key not in self.aggregations: + logger.warning(f"娌℃湁鎵惧埌寰呰仛鍚堢殑娑堟伅: user={from_user}, group={from_group}") + return False + + logger.info(f"寮哄埗澶勭悊鑱氬悎娑堟伅: user={from_user}, group={from_group}") + self._process_aggregated_messages(aggregation_key) + return True + + def stop(self): + """鍋滄鑱氬悎鏈嶅姟""" + logger.info("姝e湪鍋滄娑堟伅鑱氬悎鏈嶅姟...") + + with self.lock: + for aggregation_key, aggregation in self.aggregations.items(): + try: + with aggregation.lock: + if aggregation.timer: + aggregation.timer.cancel() + logger.debug(f"宸插彇娑堣仛鍚堝畾鏃跺櫒: {aggregation_key}") + except Exception as e: + logger.error(f"鍙栨秷鑱氬悎瀹氭椂鍣ㄥ紓甯�: {aggregation_key}, error={str(e)}") + + self.aggregations.clear() + + logger.info("娑堟伅鑱氬悎鏈嶅姟宸插仠姝�") + + +# 鍏ㄥ眬娑堟伅鑱氬悎鏈嶅姟瀹炰緥 +message_aggregator = MessageAggregatorService() diff --git a/app/services/message_processor.py b/app/services/message_processor.py index 11b1bf6..5ac473c 100644 --- a/app/services/message_processor.py +++ b/app/services/message_processor.py @@ -5,6 +5,7 @@ import json import time import re +import xml.etree.ElementTree as ET from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from sqlalchemy.orm import Session @@ -27,6 +28,69 @@ def __init__(self): pass + + def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: + """ + 鎻愬彇寮曠敤娑堟伅鐨勫唴瀹癸紙message_type涓�80014锛� + + Args: + callback_data: 鍥炶皟鏁版嵁 + + Returns: + 缁勫悎鍚庣殑娑堟伅鍐呭锛坈ontent + title锛屼腑闂寸┖涓�琛岋級 + """ + try: + data = callback_data.get("data", {}) + content = data.get("content", "") + title = data.get("title", "") + + # 瑙f瀽XML鍐呭锛屾彁鍙杕sg>appmsg>refermsg>content鏍囩涓殑鍐呭 + refer_content = "" + xml_title = "" + try: + # 瑙f瀽XML + root = ET.fromstring(content) + + # 鏌ユ壘msg>appmsg>refermsg>content璺緞 + appmsg = root.find("appmsg") + if appmsg is not None: + # 鎻愬彇XML涓殑title锛堝鏋滃瓨鍦級 + title_element = appmsg.find("title") + if title_element is not None and title_element.text: + xml_title = title_element.text.strip() + + # 鎻愬彇寮曠敤娑堟伅鍐呭 + refermsg = appmsg.find("refermsg") + if refermsg is not None: + content_element = refermsg.find("content") + if content_element is not None and content_element.text: + refer_content = content_element.text.strip() + + except ET.ParseError as e: + logger.warning(f"XML瑙f瀽澶辫触: {str(e)}, content={content}") + # 濡傛灉XML瑙f瀽澶辫触锛屼娇鐢ㄥ師濮媍ontent + refer_content = content + + # 纭畾鏈�缁堜娇鐢ㄧ殑title锛氫紭鍏堜娇鐢╔ML涓殑title锛屽叾娆′娇鐢╠ata.title + final_title = xml_title if xml_title else title + + # 缁勫悎鍐呭锛歳efer_content鍦ㄥ墠锛宖inal_title鍦ㄥ悗锛屼腑闂寸┖涓�琛� + if refer_content and final_title: + combined_content = f"{refer_content}\n\n{final_title}" + elif refer_content: + combined_content = refer_content + elif final_title: + combined_content = final_title + else: + combined_content = content # 濡傛灉閮芥病鏈夛紝浣跨敤鍘熷content + + logger.info(f"寮曠敤娑堟伅鍐呭鎻愬彇瀹屾垚: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") + return combined_content + + except Exception as e: + logger.error(f"鎻愬彇寮曠敤娑堟伅鍐呭寮傚父: error={str(e)}") + # 寮傚父鎯呭喌涓嬭繑鍥炲師濮媍ontent + return callback_data.get("data", {}).get("content", "") def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: """ @@ -83,9 +147,10 @@ Returns: 鏄惁鏄湁鏁堢殑缇よ亰娑堟伅 """ - # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛� + # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛夋垨寮曠敤娑堟伅锛�80014锛� message_type = callback_data.get("messageType") - if message_type != "80001": + print(f"data: {callback_data}") + if message_type not in ["80001", "80014"]: logger.info(f"蹇界暐闈炵兢鑱婃秷鎭�: messageType={message_type}") return False @@ -150,11 +215,23 @@ if not self.is_valid_group_message(callback_data): return False - data = callback_data.get("data", {}) - from_user = data.get("fromUser") + # 瀵煎叆娑堟伅鑱氬悎鏈嶅姟锛堝欢杩熷鍏ラ伩鍏嶅惊鐜緷璧栵級 + from app.services.message_aggregator import message_aggregator - # 灏嗘暣涓洖璋冨唴瀹硅浆鎴怞SON瀛楃涓插瓨鍌ㄥ埌Redis闃熷垪 - return redis_queue.enqueue_message(from_user, callback_data) + # 灏濊瘯灏嗘秷鎭坊鍔犲埌鑱氬悎涓� + should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) + + if should_process_immediately: + # 闇�瑕佺珛鍗冲鐞嗭紙涓嶈仛鍚堟垨鑱氬悎瓒呮椂锛� + data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) + from_user = data.get("fromUser") + + # 灏嗘秷鎭姞鍏edis闃熷垪 + return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) + else: + # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞� + logger.info(f"娑堟伅宸叉坊鍔犲埌鑱氬悎闃熷垪锛岀瓑寰呰仛鍚堝鐞�: fromUser={callback_data.get('data', {}).get('fromUser')}") + return True def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: """ @@ -230,8 +307,15 @@ from_group = data.get("fromGroup") content = data.get("content") w_id = data.get("wId") + message_type = message_data.get("messageType") - logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}") + logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}, message_type={message_type}") + + # 鏍规嵁娑堟伅绫诲瀷澶勭悊鍐呭 + if message_type == "80014": + # 寮曠敤娑堟伅锛岄渶瑕佹彁鍙朮ML涓殑鍐呭骞朵笌title缁勫悎 + content = self.extract_refer_message_content(message_data) + logger.info(f"寮曠敤娑堟伅鍐呭澶勭悊瀹屾垚: content_length={len(content)}") # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞� with next(get_db()) as db: diff --git a/config.py b/config.py index e0b1455..2fcbf94 100644 --- a/config.py +++ b/config.py @@ -111,6 +111,11 @@ self.keyword_filter_enabled = keyword_filter_config.get("enabled", False) self.keyword_filter_keywords = keyword_filter_config.get("keywords", []) + # 娑堟伅鑱氬悎閰嶇疆 + message_aggregation_config = config_data.get("message_aggregation", {}) + self.message_aggregation_enabled = message_aggregation_config.get("enabled", True) + self.message_aggregation_timeout = message_aggregation_config.get("timeout_seconds", 15) + def update_ecloud_w_id(self, new_w_id: str) -> bool: """ 鍔ㄦ�佹洿鏂癊浜戠瀹剁殑w_id閰嶇疆 diff --git a/ecloud_dify.spec b/ecloud_dify.spec index 528fbd5..a2ac334 100644 --- a/ecloud_dify.spec +++ b/ecloud_dify.spec @@ -11,7 +11,6 @@ # 娣诲姞閰嶇疆鏂囦欢 datas.append(('config.example.json', '.')) -datas.append(('config.production.json', '.')) if os.path.exists('config.json'): datas.append(('config.json', '.')) diff --git a/logs/app.log b/logs/app.log index 78fc2ba..4f17013 100644 --- a/logs/app.log +++ b/logs/app.log @@ -1,19 +1,5 @@ -2025-07-28 09:54:56 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 10:41:14 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 10:42:46 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 11:10:52 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 11:23:47 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 11:41:55 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 11:44:17 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 14:08:32 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 14:31:14 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 16:07:05 | INFO | __main__:<module>:139 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 16:23:36 | INFO | __main__:<module>:152 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 16:39:45 | INFO | __main__:<module>:159 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:17:10 | INFO | __main__:<module>:161 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:18:18 | INFO | __main__:<module>:161 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:29:25 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:29:50 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:34:00 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 17:44:45 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 -2025-07-28 18:12:07 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 +2025-08-07 16:49:53 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 +2025-08-07 17:07:18 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 +2025-08-07 17:09:51 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 +2025-08-07 17:48:05 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 +2025-08-07 17:51:18 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟 diff --git a/main.py b/main.py index cba621f..e6b60b3 100644 --- a/main.py +++ b/main.py @@ -11,9 +11,11 @@ from config import settings from app.api.callback import router as callback_router from app.api.friend_ignore import router as friend_ignore_router +from app.api.message_aggregation import router as message_aggregation_router from app.models.database import create_tables from app.workers.message_worker import message_worker from app.services.contact_sync import contact_sync_service +from app.services.message_aggregator import message_aggregator @asynccontextmanager @@ -57,6 +59,13 @@ # 鍏抽棴鏃舵墽琛� logger.info("姝e湪鍏抽棴搴旂敤...") + # 鍋滄娑堟伅鑱氬悎鏈嶅姟 + try: + message_aggregator.stop() + logger.info("娑堟伅鑱氬悎鏈嶅姟宸插仠姝�") + except Exception as e: + logger.error(f"娑堟伅鑱氬悎鏈嶅姟鍋滄澶辫触: {str(e)}") + # 鍋滄娑堟伅宸ヤ綔杩涚▼ try: message_worker.stop() @@ -87,6 +96,7 @@ # 娉ㄥ唽璺敱 app.include_router(callback_router, prefix="/api/v1", tags=["鍥炶皟鎺ュ彛"]) app.include_router(friend_ignore_router, prefix="/api/v1", tags=["濂藉弸蹇界暐绠$悊"]) +app.include_router(message_aggregation_router, prefix="/api/v1/aggregation", tags=["娑堟伅鑱氬悎绠$悊"]) @app.get("/") diff --git a/startup.py b/startup.py new file mode 100644 index 0000000..2f02c8d --- /dev/null +++ b/startup.py @@ -0,0 +1,128 @@ +""" +鍚姩鑴氭湰 - 澶勭悊閰嶇疆鏂囦欢鍜岀洰褰曞垵濮嬪寲 +""" + +import os +import sys +import json +import shutil +from pathlib import Path + + +def get_exe_dir(): + """鑾峰彇exe鏂囦欢鎵�鍦ㄧ洰褰�""" + if getattr(sys, "frozen", False): + # 濡傛灉鏄墦鍖呭悗鐨別xe鏂囦欢 + return os.path.dirname(sys.executable) + else: + # 濡傛灉鏄紑鍙戠幆澧� + return os.path.dirname(os.path.abspath(__file__)) + + +def ensure_directories(): + """纭繚蹇呰鐨勭洰褰曞瓨鍦�""" + exe_dir = get_exe_dir() + + # 鍒涘缓鏃ュ織鐩綍 + logs_dir = os.path.join(exe_dir, "logs") + if not os.path.exists(logs_dir): + os.makedirs(logs_dir) + print(f"鍒涘缓鏃ュ織鐩綍: {logs_dir}") + + return exe_dir + + +def ensure_config_file(): + """纭繚閰嶇疆鏂囦欢瀛樺湪""" + exe_dir = get_exe_dir() + config_file = os.path.join(exe_dir, "config.json") + + if not os.path.exists(config_file): + # 濡傛灉config.json涓嶅瓨鍦紝灏濊瘯澶嶅埗鐢熶骇閰嶇疆妯℃澘 + production_config = os.path.join(exe_dir, "config.production.json") + example_config = os.path.join(exe_dir, "config.example.json") + + if os.path.exists(production_config): + shutil.copy2(production_config, config_file) + print(f"澶嶅埗鐢熶骇閰嶇疆鏂囦欢: {production_config} -> {config_file}") + elif os.path.exists(example_config): + shutil.copy2(example_config, config_file) + print(f"澶嶅埗绀轰緥閰嶇疆鏂囦欢: {example_config} -> {config_file}") + else: + # 鍒涘缓榛樿閰嶇疆鏂囦欢 + default_config = { + "database": { + "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify" + }, + "redis": {"url": "redis://localhost:6379/0"}, + "ecloud": { + "base_url": "http://125.122.152.142:9899", + "authorization": "your_ecloud_authorization_token", + "w_id": "your_ecloud_w_id", + }, + "dify": { + "base_url": "https://api.dify.ai/v1", + "api_key": "your_dify_api_key", + }, + "server": {"host": "0.0.0.0", "port": 7979, "debug": False}, + "logging": {"level": "INFO", "file": "logs/app.log"}, + "message_processing": { + "max_retry_count": 3, + "retry_delay": 5, + "queue_timeout": 300, + }, + "customer_service": {"names": ["瀹㈡湇1", "瀹㈡湇2"]}, + } + + with open(config_file, "w", encoding="utf-8") as f: + json.dump(default_config, f, indent=2, ensure_ascii=False) + print(f"鍒涘缓榛樿閰嶇疆鏂囦欢: {config_file}") + + return config_file + + +def setup_environment(): + """璁剧疆杩愯鐜""" + exe_dir = ensure_directories() + config_file = ensure_config_file() + + # 璁剧疆宸ヤ綔鐩綍涓篹xe鎵�鍦ㄧ洰褰� + os.chdir(exe_dir) + print(f"璁剧疆宸ヤ綔鐩綍: {exe_dir}") + + return exe_dir, config_file + + +if __name__ == "__main__": + setup_environment() + + # 瀵煎叆骞跺惎鍔ㄤ富搴旂敤 + try: + from main import app + import uvicorn + from config import settings + from loguru import logger + + # 閰嶇疆鏃ュ織 + logger.add( + settings.log_file, + rotation="1 day", + retention="7 days", + level=settings.log_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}", + ) + + logger.info("鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟") + + # 鍚姩鏈嶅姟 + uvicorn.run( + app, + host=settings.server_host, + port=settings.server_port, + log_level=settings.log_level.lower(), + ) + + except Exception as e: + print(f"鍚姩澶辫触: {e}") + input("鎸変换鎰忛敭閫�鍑�...") + sys.exit(1) diff --git a/tests/test_message_aggregator.py b/tests/test_message_aggregator.py new file mode 100644 index 0000000..42a7a27 --- /dev/null +++ b/tests/test_message_aggregator.py @@ -0,0 +1,405 @@ +""" +娑堟伅鑱氬悎鏈嶅姟娴嬭瘯 +""" + +import time +import threading +import pytest +from unittest.mock import patch, MagicMock +from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation + + +class TestMessageAggregatorService: + """娑堟伅鑱氬悎鏈嶅姟娴嬭瘯绫�""" + + def setup_method(self): + """娴嬭瘯鍓嶅噯澶�""" + self.aggregator = MessageAggregatorService() + # 璁剧疆杈冪煭鐨勮秴鏃舵椂闂寸敤浜庢祴璇� + self.aggregator.aggregation_timeout = 2 + self.aggregator.aggregation_enabled = True + + def teardown_method(self): + """娴嬭瘯鍚庢竻鐞�""" + self.aggregator.stop() + + def test_should_aggregate_message_valid(self): + """娴嬭瘯鏈夋晥娑堟伅鐨勮仛鍚堝垽鏂�""" + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + result = self.aggregator.should_aggregate_message(message_data) + assert result is True + + def test_should_aggregate_message_disabled(self): + """娴嬭瘯鑱氬悎鍔熻兘绂佺敤鏃剁殑鍒ゆ柇""" + self.aggregator.aggregation_enabled = False + + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + result = self.aggregator.should_aggregate_message(message_data) + assert result is False + + def test_should_aggregate_message_wrong_type(self): + """娴嬭瘯閿欒娑堟伅绫诲瀷鐨勮仛鍚堝垽鏂�""" + message_data = { + "messageType": "80002", # 闈炵兢鑱婃秷鎭� + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + result = self.aggregator.should_aggregate_message(message_data) + assert result is False + + def test_should_aggregate_message_self_message(self): + """娴嬭瘯鑷繁鍙戦�佺殑娑堟伅鐨勮仛鍚堝垽鏂�""" + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": True # 鑷繁鍙戦�佺殑娑堟伅 + } + } + + result = self.aggregator.should_aggregate_message(message_data) + assert result is False + + def test_should_aggregate_message_missing_fields(self): + """娴嬭瘯缂哄皯蹇呰瀛楁鐨勬秷鎭殑鑱氬悎鍒ゆ柇""" + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + # 缂哄皯 fromGroup + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + result = self.aggregator.should_aggregate_message(message_data) + assert result is False + + def test_add_message_to_aggregation_no_aggregation(self): + """娴嬭瘯涓嶉渶瑕佽仛鍚堢殑娑堟伅""" + self.aggregator.aggregation_enabled = False + + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) + + assert should_process is True + assert aggregated_data == message_data + + def test_add_message_to_aggregation_first_message(self): + """娴嬭瘯娣诲姞绗竴鏉℃秷鎭埌鑱氬悎""" + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "绗竴鏉℃秷鎭�", + "self": False + } + } + + should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) + + assert should_process is False + assert aggregated_data is None + + # 妫�鏌ヨ仛鍚堢姸鎬� + status = self.aggregator.get_aggregation_status() + assert status["active_aggregations"] == 1 + assert "group123@chatroom:wxid_test123" in status["aggregations"] + + def test_add_message_to_aggregation_multiple_messages(self): + """娴嬭瘯娣诲姞澶氭潯娑堟伅鍒拌仛鍚�""" + message_data_1 = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "绗竴鏉℃秷鎭�", + "self": False + } + } + + message_data_2 = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "绗簩鏉℃秷鎭�", + "self": False + } + } + + # 娣诲姞绗竴鏉℃秷鎭� + should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1) + assert should_process_1 is False + + # 娣诲姞绗簩鏉℃秷鎭� + should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2) + assert should_process_2 is False + + # 妫�鏌ヨ仛鍚堢姸鎬� + status = self.aggregator.get_aggregation_status() + aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] + assert aggregation_info["message_count"] == 2 + + @patch('app.services.message_processor.message_processor') + def test_process_aggregated_messages_timeout(self, mock_message_processor): + """娴嬭瘯鑱氬悎娑堟伅瓒呮椂澶勭悊""" + mock_message_processor.process_single_message.return_value = True + + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + # 娣诲姞娑堟伅鍒拌仛鍚� + should_process, _ = self.aggregator.add_message_to_aggregation(message_data) + assert should_process is False + + # 绛夊緟瓒呮椂澶勭悊 + time.sleep(2.5) + + # 楠岃瘉娑堟伅澶勭悊鍣ㄨ璋冪敤 + mock_message_processor.process_single_message.assert_called_once() + + # 妫�鏌ヨ仛鍚堝凡琚竻鐞� + status = self.aggregator.get_aggregation_status() + assert status["active_aggregations"] == 0 + + def test_get_aggregation_status(self): + """娴嬭瘯鑾峰彇鑱氬悎鐘舵��""" + # 鍒濆鐘舵�� + status = self.aggregator.get_aggregation_status() + assert status["enabled"] is True + assert status["timeout"] == 2 + assert status["active_aggregations"] == 0 + assert status["aggregations"] == {} + + # 娣诲姞娑堟伅鍚庣殑鐘舵�� + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + self.aggregator.add_message_to_aggregation(message_data) + + status = self.aggregator.get_aggregation_status() + assert status["active_aggregations"] == 1 + assert "group123@chatroom:wxid_test123" in status["aggregations"] + + aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] + assert aggregation_info["from_user"] == "wxid_test123" + assert aggregation_info["from_group"] == "group123@chatroom" + assert aggregation_info["message_count"] == 1 + + @patch('app.services.message_processor.message_processor') + def test_force_process_aggregation(self, mock_message_processor): + """娴嬭瘯寮哄埗澶勭悊鑱氬悎娑堟伅""" + mock_message_processor.process_single_message.return_value = True + + message_data = { + "messageType": "80001", + "data": { + "fromUser": "wxid_test123", + "fromGroup": "group123@chatroom", + "content": "娴嬭瘯娑堟伅", + "self": False + } + } + + # 娣诲姞娑堟伅鍒拌仛鍚� + self.aggregator.add_message_to_aggregation(message_data) + + # 寮哄埗澶勭悊鑱氬悎 + result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") + assert result is True + + # 楠岃瘉娑堟伅澶勭悊鍣ㄨ璋冪敤 + mock_message_processor.process_single_message.assert_called_once() + + # 妫�鏌ヨ仛鍚堝凡琚竻鐞� + status = self.aggregator.get_aggregation_status() + assert status["active_aggregations"] == 0 + + def test_force_process_aggregation_not_found(self): + """娴嬭瘯寮哄埗澶勭悊涓嶅瓨鍦ㄧ殑鑱氬悎""" + result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") + assert result is False + + +class TestPendingMessage: + """寰呰仛鍚堟秷鎭祴璇曠被""" + + def test_pending_message_creation(self): + """娴嬭瘯寰呰仛鍚堟秷鎭垱寤�""" + message_data = {"test": "data"} + timestamp = time.time() + + pending_message = PendingMessage( + message_data=message_data, + timestamp=timestamp, + content="娴嬭瘯鍐呭", + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + assert pending_message.message_data == message_data + assert pending_message.timestamp == timestamp + assert pending_message.content == "娴嬭瘯鍐呭" + assert pending_message.from_user == "wxid_test123" + assert pending_message.from_group == "group123@chatroom" + + +class TestMessageAggregation: + """娑堟伅鑱氬悎瀵硅薄娴嬭瘯绫�""" + + def test_message_aggregation_creation(self): + """娴嬭瘯娑堟伅鑱氬悎瀵硅薄鍒涘缓""" + aggregation = MessageAggregation( + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + assert aggregation.from_user == "wxid_test123" + assert aggregation.from_group == "group123@chatroom" + assert len(aggregation.messages) == 0 + assert aggregation.first_message_time == 0.0 + assert aggregation.last_message_time == 0.0 + assert aggregation.timer is None + + def test_add_message(self): + """娴嬭瘯娣诲姞娑堟伅鍒拌仛鍚�""" + aggregation = MessageAggregation( + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + timestamp = time.time() + pending_message = PendingMessage( + message_data={"test": "data"}, + timestamp=timestamp, + content="娴嬭瘯鍐呭", + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + aggregation.add_message(pending_message) + + assert len(aggregation.messages) == 1 + assert aggregation.first_message_time == timestamp + assert aggregation.last_message_time == timestamp + + def test_get_aggregated_content(self): + """娴嬭瘯鑾峰彇鑱氬悎鍚庣殑鍐呭""" + aggregation = MessageAggregation( + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + # 娣诲姞澶氭潯娑堟伅 + for i in range(3): + pending_message = PendingMessage( + message_data={"test": f"data{i}"}, + timestamp=time.time() + i, + content=f"娑堟伅{i+1}", + from_user="wxid_test123", + from_group="group123@chatroom" + ) + aggregation.add_message(pending_message) + + aggregated_content = aggregation.get_aggregated_content() + assert aggregated_content == "娑堟伅1\n娑堟伅2\n娑堟伅3" + + def test_get_latest_message_data(self): + """娴嬭瘯鑾峰彇鏈�鏂版秷鎭暟鎹�""" + aggregation = MessageAggregation( + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + # 娣诲姞澶氭潯娑堟伅 + for i in range(3): + pending_message = PendingMessage( + message_data={"test": f"data{i}", "timestamp": i}, + timestamp=time.time() + i, + content=f"娑堟伅{i+1}", + from_user="wxid_test123", + from_group="group123@chatroom" + ) + aggregation.add_message(pending_message) + + latest_data = aggregation.get_latest_message_data() + assert latest_data["test"] == "data2" # 鏈�鍚庝竴鏉℃秷鎭� + assert latest_data["timestamp"] == 2 + + def test_clear(self): + """娴嬭瘯娓呯┖鑱氬悎鏁版嵁""" + aggregation = MessageAggregation( + from_user="wxid_test123", + from_group="group123@chatroom" + ) + + # 娣诲姞娑堟伅 + pending_message = PendingMessage( + message_data={"test": "data"}, + timestamp=time.time(), + content="娴嬭瘯鍐呭", + from_user="wxid_test123", + from_group="group123@chatroom" + ) + aggregation.add_message(pending_message) + + # 璁剧疆瀹氭椂鍣� + aggregation.timer = threading.Timer(1, lambda: None) + + # 娓呯┖ + aggregation.clear() + + assert len(aggregation.messages) == 0 + assert aggregation.first_message_time == 0.0 + assert aggregation.last_message_time == 0.0 + assert aggregation.timer is None -- Gitblit v1.9.1