From 2e391d599d08ea7a7c11442bc2845a1191494c3d Mon Sep 17 00:00:00 2001
From: yj <2077506045@qq.com>
Date: 星期四, 07 八月 2025 17:55:58 +0800
Subject: [PATCH] 新增消息聚会,短时间内同一用户发送的消息聚合才后发送给AI

---
 app/api/message_aggregation.py     |  139 ++++++
 app/services/ecloud_client.py      |    3 
 startup.py                         |  128 ++++++
 config.py                          |    5 
 main.py                            |   10 
 tests/test_message_aggregator.py   |  405 ++++++++++++++++++++
 app/services/message_aggregator.py |  328 ++++++++++++++++
 ecloud_dify.spec                   |    1 
 app/services/message_processor.py  |   98 ++++
 logs/app.log                       |   24 
 10 files changed, 1,113 insertions(+), 28 deletions(-)

diff --git a/app/api/message_aggregation.py b/app/api/message_aggregation.py
new file mode 100644
index 0000000..817fdde
--- /dev/null
+++ b/app/api/message_aggregation.py
@@ -0,0 +1,139 @@
+"""
+娑堟伅鑱氬悎绠$悊鎺ュ彛
+"""
+
+from fastapi import APIRouter, HTTPException
+from pydantic import BaseModel
+from typing import Dict, Any, Optional
+from loguru import logger
+
+from app.services.message_aggregator import message_aggregator
+
+
+router = APIRouter()
+
+
+class AggregationStatusResponse(BaseModel):
+    """鑱氬悎鐘舵�佸搷搴旀ā鍨�"""
+
+    success: bool
+    message: str
+    data: Dict[str, Any]
+
+
+class ForceProcessRequest(BaseModel):
+    """寮哄埗澶勭悊璇锋眰妯″瀷"""
+
+    from_user: str
+    from_group: str
+
+
+class ForceProcessResponse(BaseModel):
+    """寮哄埗澶勭悊鍝嶅簲妯″瀷"""
+
+    success: bool
+    message: str
+    processed: bool
+
+
+@router.get("/status", response_model=AggregationStatusResponse)
+async def get_aggregation_status():
+    """
+    鑾峰彇娑堟伅鑱氬悎鐘舵��
+
+    Returns:
+        鑱氬悎鐘舵�佷俊鎭�
+    """
+    try:
+        status = message_aggregator.get_aggregation_status()
+
+        return AggregationStatusResponse(
+            success=True, message="鑾峰彇鑱氬悎鐘舵�佹垚鍔�", data=status
+        )
+
+    except Exception as e:
+        logger.error(f"鑾峰彇鑱氬悎鐘舵�佸紓甯�: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"鑾峰彇鑱氬悎鐘舵�佸け璐�: {str(e)}")
+
+
+@router.post("/force-process", response_model=ForceProcessResponse)
+async def force_process_aggregation(request: ForceProcessRequest):
+    """
+    寮哄埗澶勭悊鎸囧畾鐢ㄦ埛鍜岀兢缁勭殑鑱氬悎娑堟伅
+
+    Args:
+        request: 寮哄埗澶勭悊璇锋眰
+
+    Returns:
+        澶勭悊缁撴灉
+    """
+    try:
+        logger.info(
+            f"寮哄埗澶勭悊鑱氬悎娑堟伅璇锋眰: from_user={request.from_user}, from_group={request.from_group}"
+        )
+
+        processed = message_aggregator.force_process_aggregation(
+            request.from_user, request.from_group
+        )
+
+        if processed:
+            return ForceProcessResponse(
+                success=True, message="鑱氬悎娑堟伅澶勭悊鎴愬姛", processed=True
+            )
+        else:
+            return ForceProcessResponse(
+                success=True, message="娌℃湁鎵惧埌寰呰仛鍚堢殑娑堟伅", processed=False
+            )
+
+    except Exception as e:
+        logger.error(
+            f"寮哄埗澶勭悊鑱氬悎娑堟伅寮傚父: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}"
+        )
+        raise HTTPException(status_code=500, detail=f"寮哄埗澶勭悊鑱氬悎娑堟伅澶辫触: {str(e)}")
+
+
+@router.get("/config")
+async def get_aggregation_config():
+    """
+    鑾峰彇娑堟伅鑱氬悎閰嶇疆
+
+    Returns:
+        鑱氬悎閰嶇疆淇℃伅
+    """
+    try:
+        config = {
+            "enabled": message_aggregator.aggregation_enabled,
+            "timeout_seconds": message_aggregator.aggregation_timeout,
+        }
+
+        return {"success": True, "message": "鑾峰彇鑱氬悎閰嶇疆鎴愬姛", "data": config}
+
+    except Exception as e:
+        logger.error(f"鑾峰彇鑱氬悎閰嶇疆寮傚父: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"鑾峰彇鑱氬悎閰嶇疆澶辫触: {str(e)}")
+
+
+@router.get("/health")
+async def aggregation_health_check():
+    """
+    娑堟伅鑱氬悎鏈嶅姟鍋ュ悍妫�鏌�
+
+    Returns:
+        鍋ュ悍鐘舵��
+    """
+    try:
+        status = message_aggregator.get_aggregation_status()
+
+        return {
+            "success": True,
+            "message": "娑堟伅鑱氬悎鏈嶅姟杩愯姝e父",
+            "data": {
+                "service_status": "healthy",
+                "enabled": status["enabled"],
+                "active_aggregations": status["active_aggregations"],
+            },
+        }
+
+    except Exception as e:
+        logger.error(f"鑱氬悎鏈嶅姟鍋ュ悍妫�鏌ュ紓甯�: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"鑱氬悎鏈嶅姟鍋ュ悍妫�鏌ュけ璐�: {str(e)}")
diff --git a/app/services/ecloud_client.py b/app/services/ecloud_client.py
index b8a64ef..0a26cac 100644
--- a/app/services/ecloud_client.py
+++ b/app/services/ecloud_client.py
@@ -38,7 +38,8 @@
             if keyword in filtered_content:
                 filtered_content = filtered_content.replace(keyword, "")
                 logger.info(f"杩囨护鍏抽敭璇�: {keyword}")
-
+        # 鍘婚櫎鍓嶅悗绌烘牸
+        filtered_content = filtered_content.strip()
         return filtered_content
 
     def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]:
diff --git a/app/services/message_aggregator.py b/app/services/message_aggregator.py
new file mode 100644
index 0000000..9dbe3e0
--- /dev/null
+++ b/app/services/message_aggregator.py
@@ -0,0 +1,328 @@
+"""
+娑堟伅鑱氬悎鏈嶅姟
+瀹炵幇鍚屼竴缇ょ粍涓悓涓�鐢ㄦ埛鐭椂闂村唴澶氭潯娑堟伅鐨勮仛鍚堝姛鑳�
+"""
+
+import time
+import threading
+import json
+from typing import Dict, List, Optional, Any, Tuple
+from loguru import logger
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+
+from config import settings
+
+
+@dataclass
+class PendingMessage:
+    """寰呰仛鍚堢殑娑堟伅"""
+    message_data: Dict[str, Any]
+    timestamp: float
+    content: str
+    from_user: str
+    from_group: str
+
+
+@dataclass
+class MessageAggregation:
+    """娑堟伅鑱氬悎瀵硅薄"""
+    from_user: str
+    from_group: str
+    messages: List[PendingMessage] = field(default_factory=list)
+    first_message_time: float = 0.0
+    last_message_time: float = 0.0
+    timer: Optional[threading.Timer] = None
+    lock: threading.Lock = field(default_factory=threading.Lock)
+
+    def add_message(self, message: PendingMessage):
+        """娣诲姞娑堟伅鍒拌仛鍚堜腑"""
+        with self.lock:
+            self.messages.append(message)
+            if not self.first_message_time:
+                self.first_message_time = message.timestamp
+            self.last_message_time = message.timestamp
+
+    def get_aggregated_content(self) -> str:
+        """鑾峰彇鑱氬悎鍚庣殑娑堟伅鍐呭"""
+        with self.lock:
+            if not self.messages:
+                return ""
+            
+            # 鎸夋椂闂存埑鎺掑簭娑堟伅
+            sorted_messages = sorted(self.messages, key=lambda x: x.timestamp)
+            
+            # 鍚堝苟娑堟伅鍐呭
+            contents = [msg.content for msg in sorted_messages]
+            return "\n".join(contents)
+
+    def get_latest_message_data(self) -> Optional[Dict[str, Any]]:
+        """鑾峰彇鏈�鏂扮殑娑堟伅鏁版嵁浣滀负妯℃澘"""
+        with self.lock:
+            if not self.messages:
+                return None
+            
+            # 杩斿洖鏈�鍚庝竴鏉℃秷鎭殑鏁版嵁浣滀负妯℃澘
+            latest_message = max(self.messages, key=lambda x: x.timestamp)
+            return latest_message.message_data.copy()
+
+    def clear(self):
+        """娓呯┖鑱氬悎鏁版嵁"""
+        with self.lock:
+            self.messages.clear()
+            self.first_message_time = 0.0
+            self.last_message_time = 0.0
+            if self.timer:
+                self.timer.cancel()
+                self.timer = None
+
+
+class MessageAggregatorService:
+    """娑堟伅鑱氬悎鏈嶅姟"""
+
+    def __init__(self):
+        self.aggregations: Dict[str, MessageAggregation] = {}
+        self.lock = threading.Lock()
+        self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15)  # 榛樿15绉�
+        self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True)  # 榛樿鍚敤
+        
+        logger.info(f"娑堟伅鑱氬悎鏈嶅姟鍒濆鍖�: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}")
+
+    def _get_aggregation_key(self, from_user: str, from_group: str) -> str:
+        """鑾峰彇鑱氬悎閿�"""
+        return f"{from_group}:{from_user}"
+
+    def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool:
+        """
+        鍒ゆ柇娑堟伅鏄惁搴旇琚仛鍚�
+        
+        Args:
+            message_data: 娑堟伅鏁版嵁
+            
+        Returns:
+            鏄惁搴旇鑱氬悎
+        """
+        if not self.aggregation_enabled:
+            return False
+            
+        # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅
+        message_type = message_data.get("messageType")
+        if message_type != "80001":
+            return False
+            
+        data = message_data.get("data", {})
+        
+        # 妫�鏌ユ槸鍚︽槸鑷繁鍙戦�佺殑娑堟伅
+        if data.get("self", False):
+            return False
+            
+        # 妫�鏌ュ繀瑕佸瓧娈�
+        if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"):
+            return False
+            
+        return True
+
+    def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
+        """
+        灏嗘秷鎭坊鍔犲埌鑱氬悎涓�
+        
+        Args:
+            message_data: 娑堟伅鏁版嵁
+            
+        Returns:
+            (鏄惁闇�瑕佺珛鍗冲鐞�, 鑱氬悎鍚庣殑娑堟伅鏁版嵁)
+            - 濡傛灉杩斿洖 (False, None)锛岃〃绀烘秷鎭凡琚仛鍚堬紝涓嶉渶瑕佺珛鍗冲鐞�
+            - 濡傛灉杩斿洖 (True, aggregated_data)锛岃〃绀鸿仛鍚堣秴鏃讹紝闇�瑕佸鐞嗚仛鍚堝悗鐨勬秷鎭�
+        """
+        if not self.should_aggregate_message(message_data):
+            # 涓嶉渶瑕佽仛鍚堬紝鐩存帴澶勭悊
+            return True, message_data
+            
+        data = message_data.get("data", {})
+        from_user = data.get("fromUser")
+        from_group = data.get("fromGroup")
+        content = data.get("content", "")
+        
+        aggregation_key = self._get_aggregation_key(from_user, from_group)
+        current_time = time.time()
+        
+        pending_message = PendingMessage(
+            message_data=message_data,
+            timestamp=current_time,
+            content=content,
+            from_user=from_user,
+            from_group=from_group
+        )
+        
+        with self.lock:
+            if aggregation_key not in self.aggregations:
+                # 鍒涘缓鏂扮殑鑱氬悎
+                self.aggregations[aggregation_key] = MessageAggregation(
+                    from_user=from_user,
+                    from_group=from_group
+                )
+                
+            aggregation = self.aggregations[aggregation_key]
+            
+        # 鍙栨秷涔嬪墠鐨勫畾鏃跺櫒
+        with aggregation.lock:
+            if aggregation.timer:
+                aggregation.timer.cancel()
+                
+        # 娣诲姞娑堟伅鍒拌仛鍚�
+        aggregation.add_message(pending_message)
+        
+        # 璁剧疆鏂扮殑瀹氭椂鍣�
+        timer = threading.Timer(
+            self.aggregation_timeout,
+            self._process_aggregated_messages,
+            args=(aggregation_key,)
+        )
+        
+        with aggregation.lock:
+            aggregation.timer = timer
+            
+        timer.start()
+        
+        logger.info(f"娑堟伅宸叉坊鍔犲埌鑱氬悎: user={from_user}, group={from_group}, "
+                   f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s")
+        
+        # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞�
+        return False, None
+
+    def _process_aggregated_messages(self, aggregation_key: str):
+        """
+        澶勭悊鑱氬悎鐨勬秷鎭紙瀹氭椂鍣ㄥ洖璋冿級
+        
+        Args:
+            aggregation_key: 鑱氬悎閿�
+        """
+        try:
+            with self.lock:
+                if aggregation_key not in self.aggregations:
+                    logger.warning(f"鑱氬悎閿笉瀛樺湪: {aggregation_key}")
+                    return
+                    
+                aggregation = self.aggregations[aggregation_key]
+                
+            # 鑾峰彇鑱氬悎鍚庣殑娑堟伅鏁版嵁
+            aggregated_content = aggregation.get_aggregated_content()
+            latest_message_data = aggregation.get_latest_message_data()
+            
+            if not aggregated_content or not latest_message_data:
+                logger.warning(f"鑱氬悎鏁版嵁涓虹┖: {aggregation_key}")
+                self._cleanup_aggregation(aggregation_key)
+                return
+                
+            # 鍒涘缓鑱氬悎鍚庣殑娑堟伅鏁版嵁
+            aggregated_message_data = latest_message_data.copy()
+            aggregated_message_data["data"]["content"] = aggregated_content
+            
+            message_count = len(aggregation.messages)
+            logger.info(f"澶勭悊鑱氬悎娑堟伅: user={aggregation.from_user}, group={aggregation.from_group}, "
+                       f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}")
+            
+            # 娓呯悊鑱氬悎鏁版嵁
+            self._cleanup_aggregation(aggregation_key)
+            
+            # 璋冪敤娑堟伅澶勭悊鍣ㄥ鐞嗚仛鍚堝悗鐨勬秷鎭紙寤惰繜瀵煎叆閬垮厤寰幆渚濊禆锛�
+            from app.services.message_processor import message_processor
+            success = message_processor.process_single_message(aggregated_message_data)
+            
+            if success:
+                logger.info(f"鑱氬悎娑堟伅澶勭悊鎴愬姛: user={aggregation.from_user}, group={aggregation.from_group}")
+            else:
+                logger.error(f"鑱氬悎娑堟伅澶勭悊澶辫触: user={aggregation.from_user}, group={aggregation.from_group}")
+                
+        except Exception as e:
+            logger.error(f"澶勭悊鑱氬悎娑堟伅寮傚父: aggregation_key={aggregation_key}, error={str(e)}")
+            self._cleanup_aggregation(aggregation_key)
+
+    def _cleanup_aggregation(self, aggregation_key: str):
+        """
+        娓呯悊鑱氬悎鏁版嵁
+        
+        Args:
+            aggregation_key: 鑱氬悎閿�
+        """
+        try:
+            with self.lock:
+                if aggregation_key in self.aggregations:
+                    aggregation = self.aggregations[aggregation_key]
+                    aggregation.clear()
+                    del self.aggregations[aggregation_key]
+                    logger.debug(f"鑱氬悎鏁版嵁宸叉竻鐞�: {aggregation_key}")
+        except Exception as e:
+            logger.error(f"娓呯悊鑱氬悎鏁版嵁寮傚父: aggregation_key={aggregation_key}, error={str(e)}")
+
+    def get_aggregation_status(self) -> Dict[str, Any]:
+        """
+        鑾峰彇鑱氬悎鐘舵�佷俊鎭�
+        
+        Returns:
+            鑱氬悎鐘舵�佷俊鎭�
+        """
+        with self.lock:
+            status = {
+                "enabled": self.aggregation_enabled,
+                "timeout": self.aggregation_timeout,
+                "active_aggregations": len(self.aggregations),
+                "aggregations": {}
+            }
+            
+            for key, aggregation in self.aggregations.items():
+                with aggregation.lock:
+                    status["aggregations"][key] = {
+                        "from_user": aggregation.from_user,
+                        "from_group": aggregation.from_group,
+                        "message_count": len(aggregation.messages),
+                        "first_message_time": aggregation.first_message_time,
+                        "last_message_time": aggregation.last_message_time,
+                        "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0
+                    }
+                    
+            return status
+
+    def force_process_aggregation(self, from_user: str, from_group: str) -> bool:
+        """
+        寮哄埗澶勭悊鎸囧畾鐢ㄦ埛鍜岀兢缁勭殑鑱氬悎娑堟伅
+        
+        Args:
+            from_user: 鐢ㄦ埛ID
+            from_group: 缇ょ粍ID
+            
+        Returns:
+            鏄惁鎴愬姛澶勭悊
+        """
+        aggregation_key = self._get_aggregation_key(from_user, from_group)
+        
+        with self.lock:
+            if aggregation_key not in self.aggregations:
+                logger.warning(f"娌℃湁鎵惧埌寰呰仛鍚堢殑娑堟伅: user={from_user}, group={from_group}")
+                return False
+                
+        logger.info(f"寮哄埗澶勭悊鑱氬悎娑堟伅: user={from_user}, group={from_group}")
+        self._process_aggregated_messages(aggregation_key)
+        return True
+
+    def stop(self):
+        """鍋滄鑱氬悎鏈嶅姟"""
+        logger.info("姝e湪鍋滄娑堟伅鑱氬悎鏈嶅姟...")
+        
+        with self.lock:
+            for aggregation_key, aggregation in self.aggregations.items():
+                try:
+                    with aggregation.lock:
+                        if aggregation.timer:
+                            aggregation.timer.cancel()
+                    logger.debug(f"宸插彇娑堣仛鍚堝畾鏃跺櫒: {aggregation_key}")
+                except Exception as e:
+                    logger.error(f"鍙栨秷鑱氬悎瀹氭椂鍣ㄥ紓甯�: {aggregation_key}, error={str(e)}")
+                    
+            self.aggregations.clear()
+            
+        logger.info("娑堟伅鑱氬悎鏈嶅姟宸插仠姝�")
+
+
+# 鍏ㄥ眬娑堟伅鑱氬悎鏈嶅姟瀹炰緥
+message_aggregator = MessageAggregatorService()
diff --git a/app/services/message_processor.py b/app/services/message_processor.py
index 11b1bf6..5ac473c 100644
--- a/app/services/message_processor.py
+++ b/app/services/message_processor.py
@@ -5,6 +5,7 @@
 import json
 import time
 import re
+import xml.etree.ElementTree as ET
 from typing import Dict, Any, Optional, List, Tuple
 from datetime import datetime
 from sqlalchemy.orm import Session
@@ -27,6 +28,69 @@
 
     def __init__(self):
         pass
+
+    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
+        """
+        鎻愬彇寮曠敤娑堟伅鐨勫唴瀹癸紙message_type涓�80014锛�
+
+        Args:
+            callback_data: 鍥炶皟鏁版嵁
+
+        Returns:
+            缁勫悎鍚庣殑娑堟伅鍐呭锛坈ontent + title锛屼腑闂寸┖涓�琛岋級
+        """
+        try:
+            data = callback_data.get("data", {})
+            content = data.get("content", "")
+            title = data.get("title", "")
+
+            # 瑙f瀽XML鍐呭锛屾彁鍙杕sg>appmsg>refermsg>content鏍囩涓殑鍐呭
+            refer_content = ""
+            xml_title = ""
+            try:
+                # 瑙f瀽XML
+                root = ET.fromstring(content)
+
+                # 鏌ユ壘msg>appmsg>refermsg>content璺緞
+                appmsg = root.find("appmsg")
+                if appmsg is not None:
+                    # 鎻愬彇XML涓殑title锛堝鏋滃瓨鍦級
+                    title_element = appmsg.find("title")
+                    if title_element is not None and title_element.text:
+                        xml_title = title_element.text.strip()
+
+                    # 鎻愬彇寮曠敤娑堟伅鍐呭
+                    refermsg = appmsg.find("refermsg")
+                    if refermsg is not None:
+                        content_element = refermsg.find("content")
+                        if content_element is not None and content_element.text:
+                            refer_content = content_element.text.strip()
+
+            except ET.ParseError as e:
+                logger.warning(f"XML瑙f瀽澶辫触: {str(e)}, content={content}")
+                # 濡傛灉XML瑙f瀽澶辫触锛屼娇鐢ㄥ師濮媍ontent
+                refer_content = content
+
+            # 纭畾鏈�缁堜娇鐢ㄧ殑title锛氫紭鍏堜娇鐢╔ML涓殑title锛屽叾娆′娇鐢╠ata.title
+            final_title = xml_title if xml_title else title
+
+            # 缁勫悎鍐呭锛歳efer_content鍦ㄥ墠锛宖inal_title鍦ㄥ悗锛屼腑闂寸┖涓�琛�
+            if refer_content and final_title:
+                combined_content = f"{refer_content}\n\n{final_title}"
+            elif refer_content:
+                combined_content = refer_content
+            elif final_title:
+                combined_content = final_title
+            else:
+                combined_content = content  # 濡傛灉閮芥病鏈夛紝浣跨敤鍘熷content
+
+            logger.info(f"寮曠敤娑堟伅鍐呭鎻愬彇瀹屾垚: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
+            return combined_content
+
+        except Exception as e:
+            logger.error(f"鎻愬彇寮曠敤娑堟伅鍐呭寮傚父: error={str(e)}")
+            # 寮傚父鎯呭喌涓嬭繑鍥炲師濮媍ontent
+            return callback_data.get("data", {}).get("content", "")
 
     def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
         """
@@ -83,9 +147,10 @@
         Returns:
             鏄惁鏄湁鏁堢殑缇よ亰娑堟伅
         """
-        # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛�
+        # 妫�鏌ユ秷鎭被鍨嬫槸鍚︽槸缇よ亰娑堟伅锛�80001锛夋垨寮曠敤娑堟伅锛�80014锛�
         message_type = callback_data.get("messageType")
-        if message_type != "80001":
+        print(f"data: {callback_data}")
+        if message_type not in ["80001", "80014"]:
             logger.info(f"蹇界暐闈炵兢鑱婃秷鎭�: messageType={message_type}")
             return False
 
@@ -150,11 +215,23 @@
         if not self.is_valid_group_message(callback_data):
             return False
 
-        data = callback_data.get("data", {})
-        from_user = data.get("fromUser")
+        # 瀵煎叆娑堟伅鑱氬悎鏈嶅姟锛堝欢杩熷鍏ラ伩鍏嶅惊鐜緷璧栵級
+        from app.services.message_aggregator import message_aggregator
 
-        # 灏嗘暣涓洖璋冨唴瀹硅浆鎴怞SON瀛楃涓插瓨鍌ㄥ埌Redis闃熷垪
-        return redis_queue.enqueue_message(from_user, callback_data)
+        # 灏濊瘯灏嗘秷鎭坊鍔犲埌鑱氬悎涓�
+        should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
+
+        if should_process_immediately:
+            # 闇�瑕佺珛鍗冲鐞嗭紙涓嶈仛鍚堟垨鑱氬悎瓒呮椂锛�
+            data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
+            from_user = data.get("fromUser")
+
+            # 灏嗘秷鎭姞鍏edis闃熷垪
+            return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
+        else:
+            # 娑堟伅宸茶鑱氬悎锛屼笉闇�瑕佺珛鍗冲鐞�
+            logger.info(f"娑堟伅宸叉坊鍔犲埌鑱氬悎闃熷垪锛岀瓑寰呰仛鍚堝鐞�: fromUser={callback_data.get('data', {}).get('fromUser')}")
+            return True
 
     def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
         """
@@ -230,8 +307,15 @@
             from_group = data.get("fromGroup")
             content = data.get("content")
             w_id = data.get("wId")
+            message_type = message_data.get("messageType")
 
-            logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}")
+            logger.info(f"寮�濮嬪鐞嗘秷鎭�: from_user={from_user}, from_group={from_group}, message_type={message_type}")
+
+            # 鏍规嵁娑堟伅绫诲瀷澶勭悊鍐呭
+            if message_type == "80014":
+                # 寮曠敤娑堟伅锛岄渶瑕佹彁鍙朮ML涓殑鍐呭骞朵笌title缁勫悎
+                content = self.extract_refer_message_content(message_data)
+                logger.info(f"寮曠敤娑堟伅鍐呭澶勭悊瀹屾垚: content_length={len(content)}")
 
             # 浣跨敤涓婁笅鏂囩鐞嗗櫒纭繚鏁版嵁搴撲細璇濇纭鐞�
             with next(get_db()) as db:
diff --git a/config.py b/config.py
index e0b1455..2fcbf94 100644
--- a/config.py
+++ b/config.py
@@ -111,6 +111,11 @@
         self.keyword_filter_enabled = keyword_filter_config.get("enabled", False)
         self.keyword_filter_keywords = keyword_filter_config.get("keywords", [])
 
+        # 娑堟伅鑱氬悎閰嶇疆
+        message_aggregation_config = config_data.get("message_aggregation", {})
+        self.message_aggregation_enabled = message_aggregation_config.get("enabled", True)
+        self.message_aggregation_timeout = message_aggregation_config.get("timeout_seconds", 15)
+
     def update_ecloud_w_id(self, new_w_id: str) -> bool:
         """
         鍔ㄦ�佹洿鏂癊浜戠瀹剁殑w_id閰嶇疆
diff --git a/ecloud_dify.spec b/ecloud_dify.spec
index 528fbd5..a2ac334 100644
--- a/ecloud_dify.spec
+++ b/ecloud_dify.spec
@@ -11,7 +11,6 @@
 
 # 娣诲姞閰嶇疆鏂囦欢
 datas.append(('config.example.json', '.'))
-datas.append(('config.production.json', '.'))
 if os.path.exists('config.json'):
     datas.append(('config.json', '.'))
 
diff --git a/logs/app.log b/logs/app.log
index 78fc2ba..4f17013 100644
--- a/logs/app.log
+++ b/logs/app.log
@@ -1,19 +1,5 @@
-2025-07-28 09:54:56 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 10:41:14 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 10:42:46 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 11:10:52 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 11:23:47 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 11:41:55 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 11:44:17 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 14:08:32 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 14:31:14 | INFO | __main__:<module>:124 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 16:07:05 | INFO | __main__:<module>:139 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 16:23:36 | INFO | __main__:<module>:152 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 16:39:45 | INFO | __main__:<module>:159 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:17:10 | INFO | __main__:<module>:161 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:18:18 | INFO | __main__:<module>:161 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:29:25 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:29:50 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:34:00 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 17:44:45 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
-2025-07-28 18:12:07 | INFO | __main__:<module>:122 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
+2025-08-07 16:49:53 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
+2025-08-07 17:07:18 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
+2025-08-07 17:09:51 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
+2025-08-07 17:48:05 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
+2025-08-07 17:51:18 | INFO | __main__:<module>:132 - 鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟
diff --git a/main.py b/main.py
index cba621f..e6b60b3 100644
--- a/main.py
+++ b/main.py
@@ -11,9 +11,11 @@
 from config import settings
 from app.api.callback import router as callback_router
 from app.api.friend_ignore import router as friend_ignore_router
+from app.api.message_aggregation import router as message_aggregation_router
 from app.models.database import create_tables
 from app.workers.message_worker import message_worker
 from app.services.contact_sync import contact_sync_service
+from app.services.message_aggregator import message_aggregator
 
 
 @asynccontextmanager
@@ -57,6 +59,13 @@
     # 鍏抽棴鏃舵墽琛�
     logger.info("姝e湪鍏抽棴搴旂敤...")
 
+    # 鍋滄娑堟伅鑱氬悎鏈嶅姟
+    try:
+        message_aggregator.stop()
+        logger.info("娑堟伅鑱氬悎鏈嶅姟宸插仠姝�")
+    except Exception as e:
+        logger.error(f"娑堟伅鑱氬悎鏈嶅姟鍋滄澶辫触: {str(e)}")
+
     # 鍋滄娑堟伅宸ヤ綔杩涚▼
     try:
         message_worker.stop()
@@ -87,6 +96,7 @@
 # 娉ㄥ唽璺敱
 app.include_router(callback_router, prefix="/api/v1", tags=["鍥炶皟鎺ュ彛"])
 app.include_router(friend_ignore_router, prefix="/api/v1", tags=["濂藉弸蹇界暐绠$悊"])
+app.include_router(message_aggregation_router, prefix="/api/v1/aggregation", tags=["娑堟伅鑱氬悎绠$悊"])
 
 
 @app.get("/")
diff --git a/startup.py b/startup.py
new file mode 100644
index 0000000..2f02c8d
--- /dev/null
+++ b/startup.py
@@ -0,0 +1,128 @@
+"""
+鍚姩鑴氭湰 - 澶勭悊閰嶇疆鏂囦欢鍜岀洰褰曞垵濮嬪寲
+"""
+
+import os
+import sys
+import json
+import shutil
+from pathlib import Path
+
+
+def get_exe_dir():
+    """鑾峰彇exe鏂囦欢鎵�鍦ㄧ洰褰�"""
+    if getattr(sys, "frozen", False):
+        # 濡傛灉鏄墦鍖呭悗鐨別xe鏂囦欢
+        return os.path.dirname(sys.executable)
+    else:
+        # 濡傛灉鏄紑鍙戠幆澧�
+        return os.path.dirname(os.path.abspath(__file__))
+
+
+def ensure_directories():
+    """纭繚蹇呰鐨勭洰褰曞瓨鍦�"""
+    exe_dir = get_exe_dir()
+
+    # 鍒涘缓鏃ュ織鐩綍
+    logs_dir = os.path.join(exe_dir, "logs")
+    if not os.path.exists(logs_dir):
+        os.makedirs(logs_dir)
+        print(f"鍒涘缓鏃ュ織鐩綍: {logs_dir}")
+
+    return exe_dir
+
+
+def ensure_config_file():
+    """纭繚閰嶇疆鏂囦欢瀛樺湪"""
+    exe_dir = get_exe_dir()
+    config_file = os.path.join(exe_dir, "config.json")
+
+    if not os.path.exists(config_file):
+        # 濡傛灉config.json涓嶅瓨鍦紝灏濊瘯澶嶅埗鐢熶骇閰嶇疆妯℃澘
+        production_config = os.path.join(exe_dir, "config.production.json")
+        example_config = os.path.join(exe_dir, "config.example.json")
+
+        if os.path.exists(production_config):
+            shutil.copy2(production_config, config_file)
+            print(f"澶嶅埗鐢熶骇閰嶇疆鏂囦欢: {production_config} -> {config_file}")
+        elif os.path.exists(example_config):
+            shutil.copy2(example_config, config_file)
+            print(f"澶嶅埗绀轰緥閰嶇疆鏂囦欢: {example_config} -> {config_file}")
+        else:
+            # 鍒涘缓榛樿閰嶇疆鏂囦欢
+            default_config = {
+                "database": {
+                    "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify"
+                },
+                "redis": {"url": "redis://localhost:6379/0"},
+                "ecloud": {
+                    "base_url": "http://125.122.152.142:9899",
+                    "authorization": "your_ecloud_authorization_token",
+                    "w_id": "your_ecloud_w_id",
+                },
+                "dify": {
+                    "base_url": "https://api.dify.ai/v1",
+                    "api_key": "your_dify_api_key",
+                },
+                "server": {"host": "0.0.0.0", "port": 7979, "debug": False},
+                "logging": {"level": "INFO", "file": "logs/app.log"},
+                "message_processing": {
+                    "max_retry_count": 3,
+                    "retry_delay": 5,
+                    "queue_timeout": 300,
+                },
+                "customer_service": {"names": ["瀹㈡湇1", "瀹㈡湇2"]},
+            }
+
+            with open(config_file, "w", encoding="utf-8") as f:
+                json.dump(default_config, f, indent=2, ensure_ascii=False)
+            print(f"鍒涘缓榛樿閰嶇疆鏂囦欢: {config_file}")
+
+    return config_file
+
+
+def setup_environment():
+    """璁剧疆杩愯鐜"""
+    exe_dir = ensure_directories()
+    config_file = ensure_config_file()
+
+    # 璁剧疆宸ヤ綔鐩綍涓篹xe鎵�鍦ㄧ洰褰�
+    os.chdir(exe_dir)
+    print(f"璁剧疆宸ヤ綔鐩綍: {exe_dir}")
+
+    return exe_dir, config_file
+
+
+if __name__ == "__main__":
+    setup_environment()
+
+    # 瀵煎叆骞跺惎鍔ㄤ富搴旂敤
+    try:
+        from main import app
+        import uvicorn
+        from config import settings
+        from loguru import logger
+
+        # 閰嶇疆鏃ュ織
+        logger.add(
+            settings.log_file,
+            rotation="1 day",
+            retention="7 days",
+            level=settings.log_level,
+            format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
+        )
+
+        logger.info("鍚姩E浜戠瀹�-DifyAI瀵规帴鏈嶅姟")
+
+        # 鍚姩鏈嶅姟
+        uvicorn.run(
+            app,
+            host=settings.server_host,
+            port=settings.server_port,
+            log_level=settings.log_level.lower(),
+        )
+
+    except Exception as e:
+        print(f"鍚姩澶辫触: {e}")
+        input("鎸変换鎰忛敭閫�鍑�...")
+        sys.exit(1)
diff --git a/tests/test_message_aggregator.py b/tests/test_message_aggregator.py
new file mode 100644
index 0000000..42a7a27
--- /dev/null
+++ b/tests/test_message_aggregator.py
@@ -0,0 +1,405 @@
+"""
+娑堟伅鑱氬悎鏈嶅姟娴嬭瘯
+"""
+
+import time
+import threading
+import pytest
+from unittest.mock import patch, MagicMock
+from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation
+
+
+class TestMessageAggregatorService:
+    """娑堟伅鑱氬悎鏈嶅姟娴嬭瘯绫�"""
+    
+    def setup_method(self):
+        """娴嬭瘯鍓嶅噯澶�"""
+        self.aggregator = MessageAggregatorService()
+        # 璁剧疆杈冪煭鐨勮秴鏃舵椂闂寸敤浜庢祴璇�
+        self.aggregator.aggregation_timeout = 2
+        self.aggregator.aggregation_enabled = True
+    
+    def teardown_method(self):
+        """娴嬭瘯鍚庢竻鐞�"""
+        self.aggregator.stop()
+    
+    def test_should_aggregate_message_valid(self):
+        """娴嬭瘯鏈夋晥娑堟伅鐨勮仛鍚堝垽鏂�"""
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        result = self.aggregator.should_aggregate_message(message_data)
+        assert result is True
+    
+    def test_should_aggregate_message_disabled(self):
+        """娴嬭瘯鑱氬悎鍔熻兘绂佺敤鏃剁殑鍒ゆ柇"""
+        self.aggregator.aggregation_enabled = False
+        
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        result = self.aggregator.should_aggregate_message(message_data)
+        assert result is False
+    
+    def test_should_aggregate_message_wrong_type(self):
+        """娴嬭瘯閿欒娑堟伅绫诲瀷鐨勮仛鍚堝垽鏂�"""
+        message_data = {
+            "messageType": "80002",  # 闈炵兢鑱婃秷鎭�
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        result = self.aggregator.should_aggregate_message(message_data)
+        assert result is False
+    
+    def test_should_aggregate_message_self_message(self):
+        """娴嬭瘯鑷繁鍙戦�佺殑娑堟伅鐨勮仛鍚堝垽鏂�"""
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": True  # 鑷繁鍙戦�佺殑娑堟伅
+            }
+        }
+        
+        result = self.aggregator.should_aggregate_message(message_data)
+        assert result is False
+    
+    def test_should_aggregate_message_missing_fields(self):
+        """娴嬭瘯缂哄皯蹇呰瀛楁鐨勬秷鎭殑鑱氬悎鍒ゆ柇"""
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                # 缂哄皯 fromGroup
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        result = self.aggregator.should_aggregate_message(message_data)
+        assert result is False
+    
+    def test_add_message_to_aggregation_no_aggregation(self):
+        """娴嬭瘯涓嶉渶瑕佽仛鍚堢殑娑堟伅"""
+        self.aggregator.aggregation_enabled = False
+        
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
+        
+        assert should_process is True
+        assert aggregated_data == message_data
+    
+    def test_add_message_to_aggregation_first_message(self):
+        """娴嬭瘯娣诲姞绗竴鏉℃秷鎭埌鑱氬悎"""
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "绗竴鏉℃秷鎭�",
+                "self": False
+            }
+        }
+        
+        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
+        
+        assert should_process is False
+        assert aggregated_data is None
+        
+        # 妫�鏌ヨ仛鍚堢姸鎬�
+        status = self.aggregator.get_aggregation_status()
+        assert status["active_aggregations"] == 1
+        assert "group123@chatroom:wxid_test123" in status["aggregations"]
+    
+    def test_add_message_to_aggregation_multiple_messages(self):
+        """娴嬭瘯娣诲姞澶氭潯娑堟伅鍒拌仛鍚�"""
+        message_data_1 = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "绗竴鏉℃秷鎭�",
+                "self": False
+            }
+        }
+        
+        message_data_2 = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "绗簩鏉℃秷鎭�",
+                "self": False
+            }
+        }
+        
+        # 娣诲姞绗竴鏉℃秷鎭�
+        should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1)
+        assert should_process_1 is False
+        
+        # 娣诲姞绗簩鏉℃秷鎭�
+        should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2)
+        assert should_process_2 is False
+        
+        # 妫�鏌ヨ仛鍚堢姸鎬�
+        status = self.aggregator.get_aggregation_status()
+        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
+        assert aggregation_info["message_count"] == 2
+    
+    @patch('app.services.message_processor.message_processor')
+    def test_process_aggregated_messages_timeout(self, mock_message_processor):
+        """娴嬭瘯鑱氬悎娑堟伅瓒呮椂澶勭悊"""
+        mock_message_processor.process_single_message.return_value = True
+        
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        # 娣诲姞娑堟伅鍒拌仛鍚�
+        should_process, _ = self.aggregator.add_message_to_aggregation(message_data)
+        assert should_process is False
+        
+        # 绛夊緟瓒呮椂澶勭悊
+        time.sleep(2.5)
+        
+        # 楠岃瘉娑堟伅澶勭悊鍣ㄨ璋冪敤
+        mock_message_processor.process_single_message.assert_called_once()
+        
+        # 妫�鏌ヨ仛鍚堝凡琚竻鐞�
+        status = self.aggregator.get_aggregation_status()
+        assert status["active_aggregations"] == 0
+    
+    def test_get_aggregation_status(self):
+        """娴嬭瘯鑾峰彇鑱氬悎鐘舵��"""
+        # 鍒濆鐘舵��
+        status = self.aggregator.get_aggregation_status()
+        assert status["enabled"] is True
+        assert status["timeout"] == 2
+        assert status["active_aggregations"] == 0
+        assert status["aggregations"] == {}
+        
+        # 娣诲姞娑堟伅鍚庣殑鐘舵��
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        self.aggregator.add_message_to_aggregation(message_data)
+        
+        status = self.aggregator.get_aggregation_status()
+        assert status["active_aggregations"] == 1
+        assert "group123@chatroom:wxid_test123" in status["aggregations"]
+        
+        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
+        assert aggregation_info["from_user"] == "wxid_test123"
+        assert aggregation_info["from_group"] == "group123@chatroom"
+        assert aggregation_info["message_count"] == 1
+    
+    @patch('app.services.message_processor.message_processor')
+    def test_force_process_aggregation(self, mock_message_processor):
+        """娴嬭瘯寮哄埗澶勭悊鑱氬悎娑堟伅"""
+        mock_message_processor.process_single_message.return_value = True
+        
+        message_data = {
+            "messageType": "80001",
+            "data": {
+                "fromUser": "wxid_test123",
+                "fromGroup": "group123@chatroom",
+                "content": "娴嬭瘯娑堟伅",
+                "self": False
+            }
+        }
+        
+        # 娣诲姞娑堟伅鍒拌仛鍚�
+        self.aggregator.add_message_to_aggregation(message_data)
+        
+        # 寮哄埗澶勭悊鑱氬悎
+        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
+        assert result is True
+        
+        # 楠岃瘉娑堟伅澶勭悊鍣ㄨ璋冪敤
+        mock_message_processor.process_single_message.assert_called_once()
+        
+        # 妫�鏌ヨ仛鍚堝凡琚竻鐞�
+        status = self.aggregator.get_aggregation_status()
+        assert status["active_aggregations"] == 0
+    
+    def test_force_process_aggregation_not_found(self):
+        """娴嬭瘯寮哄埗澶勭悊涓嶅瓨鍦ㄧ殑鑱氬悎"""
+        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
+        assert result is False
+
+
+class TestPendingMessage:
+    """寰呰仛鍚堟秷鎭祴璇曠被"""
+    
+    def test_pending_message_creation(self):
+        """娴嬭瘯寰呰仛鍚堟秷鎭垱寤�"""
+        message_data = {"test": "data"}
+        timestamp = time.time()
+        
+        pending_message = PendingMessage(
+            message_data=message_data,
+            timestamp=timestamp,
+            content="娴嬭瘯鍐呭",
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        assert pending_message.message_data == message_data
+        assert pending_message.timestamp == timestamp
+        assert pending_message.content == "娴嬭瘯鍐呭"
+        assert pending_message.from_user == "wxid_test123"
+        assert pending_message.from_group == "group123@chatroom"
+
+
+class TestMessageAggregation:
+    """娑堟伅鑱氬悎瀵硅薄娴嬭瘯绫�"""
+    
+    def test_message_aggregation_creation(self):
+        """娴嬭瘯娑堟伅鑱氬悎瀵硅薄鍒涘缓"""
+        aggregation = MessageAggregation(
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        assert aggregation.from_user == "wxid_test123"
+        assert aggregation.from_group == "group123@chatroom"
+        assert len(aggregation.messages) == 0
+        assert aggregation.first_message_time == 0.0
+        assert aggregation.last_message_time == 0.0
+        assert aggregation.timer is None
+    
+    def test_add_message(self):
+        """娴嬭瘯娣诲姞娑堟伅鍒拌仛鍚�"""
+        aggregation = MessageAggregation(
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        timestamp = time.time()
+        pending_message = PendingMessage(
+            message_data={"test": "data"},
+            timestamp=timestamp,
+            content="娴嬭瘯鍐呭",
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        aggregation.add_message(pending_message)
+        
+        assert len(aggregation.messages) == 1
+        assert aggregation.first_message_time == timestamp
+        assert aggregation.last_message_time == timestamp
+    
+    def test_get_aggregated_content(self):
+        """娴嬭瘯鑾峰彇鑱氬悎鍚庣殑鍐呭"""
+        aggregation = MessageAggregation(
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        # 娣诲姞澶氭潯娑堟伅
+        for i in range(3):
+            pending_message = PendingMessage(
+                message_data={"test": f"data{i}"},
+                timestamp=time.time() + i,
+                content=f"娑堟伅{i+1}",
+                from_user="wxid_test123",
+                from_group="group123@chatroom"
+            )
+            aggregation.add_message(pending_message)
+        
+        aggregated_content = aggregation.get_aggregated_content()
+        assert aggregated_content == "娑堟伅1\n娑堟伅2\n娑堟伅3"
+    
+    def test_get_latest_message_data(self):
+        """娴嬭瘯鑾峰彇鏈�鏂版秷鎭暟鎹�"""
+        aggregation = MessageAggregation(
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        # 娣诲姞澶氭潯娑堟伅
+        for i in range(3):
+            pending_message = PendingMessage(
+                message_data={"test": f"data{i}", "timestamp": i},
+                timestamp=time.time() + i,
+                content=f"娑堟伅{i+1}",
+                from_user="wxid_test123",
+                from_group="group123@chatroom"
+            )
+            aggregation.add_message(pending_message)
+        
+        latest_data = aggregation.get_latest_message_data()
+        assert latest_data["test"] == "data2"  # 鏈�鍚庝竴鏉℃秷鎭�
+        assert latest_data["timestamp"] == 2
+    
+    def test_clear(self):
+        """娴嬭瘯娓呯┖鑱氬悎鏁版嵁"""
+        aggregation = MessageAggregation(
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        
+        # 娣诲姞娑堟伅
+        pending_message = PendingMessage(
+            message_data={"test": "data"},
+            timestamp=time.time(),
+            content="娴嬭瘯鍐呭",
+            from_user="wxid_test123",
+            from_group="group123@chatroom"
+        )
+        aggregation.add_message(pending_message)
+        
+        # 璁剧疆瀹氭椂鍣�
+        aggregation.timer = threading.Timer(1, lambda: None)
+        
+        # 娓呯┖
+        aggregation.clear()
+        
+        assert len(aggregation.messages) == 0
+        assert aggregation.first_message_time == 0.0
+        assert aggregation.last_message_time == 0.0
+        assert aggregation.timer is None

--
Gitblit v1.9.1