yj
2025-08-07 2e391d599d08ea7a7c11442bc2845a1191494c3d
新增消息聚会,短时间内同一用户发送的消息聚合才后发送给AI
4个文件已添加
6个文件已修改
1141 ■■■■■ 已修改文件
app/api/message_aggregation.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/ecloud_client.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/message_aggregator.py 328 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/message_processor.py 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ecloud_dify.spec 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logs/app.log 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
startup.py 128 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tests/test_message_aggregator.py 405 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/message_aggregation.py
New file
@@ -0,0 +1,139 @@
"""
消息聚合管理接口
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional
from loguru import logger
from app.services.message_aggregator import message_aggregator
router = APIRouter()
class AggregationStatusResponse(BaseModel):
    """聚合状态响应模型"""
    success: bool
    message: str
    data: Dict[str, Any]
class ForceProcessRequest(BaseModel):
    """强制处理请求模型"""
    from_user: str
    from_group: str
class ForceProcessResponse(BaseModel):
    """强制处理响应模型"""
    success: bool
    message: str
    processed: bool
@router.get("/status", response_model=AggregationStatusResponse)
async def get_aggregation_status():
    """
    获取消息聚合状态
    Returns:
        聚合状态信息
    """
    try:
        status = message_aggregator.get_aggregation_status()
        return AggregationStatusResponse(
            success=True, message="获取聚合状态成功", data=status
        )
    except Exception as e:
        logger.error(f"获取聚合状态异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取聚合状态失败: {str(e)}")
@router.post("/force-process", response_model=ForceProcessResponse)
async def force_process_aggregation(request: ForceProcessRequest):
    """
    强制处理指定用户和群组的聚合消息
    Args:
        request: 强制处理请求
    Returns:
        处理结果
    """
    try:
        logger.info(
            f"强制处理聚合消息请求: from_user={request.from_user}, from_group={request.from_group}"
        )
        processed = message_aggregator.force_process_aggregation(
            request.from_user, request.from_group
        )
        if processed:
            return ForceProcessResponse(
                success=True, message="聚合消息处理成功", processed=True
            )
        else:
            return ForceProcessResponse(
                success=True, message="没有找到待聚合的消息", processed=False
            )
    except Exception as e:
        logger.error(
            f"强制处理聚合消息异常: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}"
        )
        raise HTTPException(status_code=500, detail=f"强制处理聚合消息失败: {str(e)}")
@router.get("/config")
async def get_aggregation_config():
    """
    获取消息聚合配置
    Returns:
        聚合配置信息
    """
    try:
        config = {
            "enabled": message_aggregator.aggregation_enabled,
            "timeout_seconds": message_aggregator.aggregation_timeout,
        }
        return {"success": True, "message": "获取聚合配置成功", "data": config}
    except Exception as e:
        logger.error(f"获取聚合配置异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取聚合配置失败: {str(e)}")
@router.get("/health")
async def aggregation_health_check():
    """
    消息聚合服务健康检查
    Returns:
        健康状态
    """
    try:
        status = message_aggregator.get_aggregation_status()
        return {
            "success": True,
            "message": "消息聚合服务运行正常",
            "data": {
                "service_status": "healthy",
                "enabled": status["enabled"],
                "active_aggregations": status["active_aggregations"],
            },
        }
    except Exception as e:
        logger.error(f"聚合服务健康检查异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"聚合服务健康检查失败: {str(e)}")
app/services/ecloud_client.py
@@ -38,7 +38,8 @@
            if keyword in filtered_content:
                filtered_content = filtered_content.replace(keyword, "")
                logger.info(f"过滤关键词: {keyword}")
        # 去除前后空格
        filtered_content = filtered_content.strip()
        return filtered_content
    def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]:
app/services/message_aggregator.py
New file
@@ -0,0 +1,328 @@
"""
消息聚合服务
实现同一群组中同一用户短时间内多条消息的聚合功能
"""
import time
import threading
import json
from typing import Dict, List, Optional, Any, Tuple
from loguru import logger
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from config import settings
@dataclass
class PendingMessage:
    """待聚合的消息"""
    message_data: Dict[str, Any]
    timestamp: float
    content: str
    from_user: str
    from_group: str
@dataclass
class MessageAggregation:
    """消息聚合对象"""
    from_user: str
    from_group: str
    messages: List[PendingMessage] = field(default_factory=list)
    first_message_time: float = 0.0
    last_message_time: float = 0.0
    timer: Optional[threading.Timer] = None
    lock: threading.Lock = field(default_factory=threading.Lock)
    def add_message(self, message: PendingMessage):
        """添加消息到聚合中"""
        with self.lock:
            self.messages.append(message)
            if not self.first_message_time:
                self.first_message_time = message.timestamp
            self.last_message_time = message.timestamp
    def get_aggregated_content(self) -> str:
        """获取聚合后的消息内容"""
        with self.lock:
            if not self.messages:
                return ""
            # 按时间戳排序消息
            sorted_messages = sorted(self.messages, key=lambda x: x.timestamp)
            # 合并消息内容
            contents = [msg.content for msg in sorted_messages]
            return "\n".join(contents)
    def get_latest_message_data(self) -> Optional[Dict[str, Any]]:
        """获取最新的消息数据作为模板"""
        with self.lock:
            if not self.messages:
                return None
            # 返回最后一条消息的数据作为模板
            latest_message = max(self.messages, key=lambda x: x.timestamp)
            return latest_message.message_data.copy()
    def clear(self):
        """清空聚合数据"""
        with self.lock:
            self.messages.clear()
            self.first_message_time = 0.0
            self.last_message_time = 0.0
            if self.timer:
                self.timer.cancel()
                self.timer = None
class MessageAggregatorService:
    """消息聚合服务"""
    def __init__(self):
        self.aggregations: Dict[str, MessageAggregation] = {}
        self.lock = threading.Lock()
        self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15)  # 默认15秒
        self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True)  # 默认启用
        logger.info(f"消息聚合服务初始化: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}")
    def _get_aggregation_key(self, from_user: str, from_group: str) -> str:
        """获取聚合键"""
        return f"{from_group}:{from_user}"
    def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool:
        """
        判断消息是否应该被聚合
        Args:
            message_data: 消息数据
        Returns:
            是否应该聚合
        """
        if not self.aggregation_enabled:
            return False
        # 检查消息类型是否是群聊消息
        message_type = message_data.get("messageType")
        if message_type != "80001":
            return False
        data = message_data.get("data", {})
        # 检查是否是自己发送的消息
        if data.get("self", False):
            return False
        # 检查必要字段
        if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"):
            return False
        return True
    def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
        """
        将消息添加到聚合中
        Args:
            message_data: 消息数据
        Returns:
            (是否需要立即处理, 聚合后的消息数据)
            - 如果返回 (False, None),表示消息已被聚合,不需要立即处理
            - 如果返回 (True, aggregated_data),表示聚合超时,需要处理聚合后的消息
        """
        if not self.should_aggregate_message(message_data):
            # 不需要聚合,直接处理
            return True, message_data
        data = message_data.get("data", {})
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
        content = data.get("content", "")
        aggregation_key = self._get_aggregation_key(from_user, from_group)
        current_time = time.time()
        pending_message = PendingMessage(
            message_data=message_data,
            timestamp=current_time,
            content=content,
            from_user=from_user,
            from_group=from_group
        )
        with self.lock:
            if aggregation_key not in self.aggregations:
                # 创建新的聚合
                self.aggregations[aggregation_key] = MessageAggregation(
                    from_user=from_user,
                    from_group=from_group
                )
            aggregation = self.aggregations[aggregation_key]
        # 取消之前的定时器
        with aggregation.lock:
            if aggregation.timer:
                aggregation.timer.cancel()
        # 添加消息到聚合
        aggregation.add_message(pending_message)
        # 设置新的定时器
        timer = threading.Timer(
            self.aggregation_timeout,
            self._process_aggregated_messages,
            args=(aggregation_key,)
        )
        with aggregation.lock:
            aggregation.timer = timer
        timer.start()
        logger.info(f"消息已添加到聚合: user={from_user}, group={from_group}, "
                   f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s")
        # 消息已被聚合,不需要立即处理
        return False, None
    def _process_aggregated_messages(self, aggregation_key: str):
        """
        处理聚合的消息(定时器回调)
        Args:
            aggregation_key: 聚合键
        """
        try:
            with self.lock:
                if aggregation_key not in self.aggregations:
                    logger.warning(f"聚合键不存在: {aggregation_key}")
                    return
                aggregation = self.aggregations[aggregation_key]
            # 获取聚合后的消息数据
            aggregated_content = aggregation.get_aggregated_content()
            latest_message_data = aggregation.get_latest_message_data()
            if not aggregated_content or not latest_message_data:
                logger.warning(f"聚合数据为空: {aggregation_key}")
                self._cleanup_aggregation(aggregation_key)
                return
            # 创建聚合后的消息数据
            aggregated_message_data = latest_message_data.copy()
            aggregated_message_data["data"]["content"] = aggregated_content
            message_count = len(aggregation.messages)
            logger.info(f"处理聚合消息: user={aggregation.from_user}, group={aggregation.from_group}, "
                       f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}")
            # 清理聚合数据
            self._cleanup_aggregation(aggregation_key)
            # 调用消息处理器处理聚合后的消息(延迟导入避免循环依赖)
            from app.services.message_processor import message_processor
            success = message_processor.process_single_message(aggregated_message_data)
            if success:
                logger.info(f"聚合消息处理成功: user={aggregation.from_user}, group={aggregation.from_group}")
            else:
                logger.error(f"聚合消息处理失败: user={aggregation.from_user}, group={aggregation.from_group}")
        except Exception as e:
            logger.error(f"处理聚合消息异常: aggregation_key={aggregation_key}, error={str(e)}")
            self._cleanup_aggregation(aggregation_key)
    def _cleanup_aggregation(self, aggregation_key: str):
        """
        清理聚合数据
        Args:
            aggregation_key: 聚合键
        """
        try:
            with self.lock:
                if aggregation_key in self.aggregations:
                    aggregation = self.aggregations[aggregation_key]
                    aggregation.clear()
                    del self.aggregations[aggregation_key]
                    logger.debug(f"聚合数据已清理: {aggregation_key}")
        except Exception as e:
            logger.error(f"清理聚合数据异常: aggregation_key={aggregation_key}, error={str(e)}")
    def get_aggregation_status(self) -> Dict[str, Any]:
        """
        获取聚合状态信息
        Returns:
            聚合状态信息
        """
        with self.lock:
            status = {
                "enabled": self.aggregation_enabled,
                "timeout": self.aggregation_timeout,
                "active_aggregations": len(self.aggregations),
                "aggregations": {}
            }
            for key, aggregation in self.aggregations.items():
                with aggregation.lock:
                    status["aggregations"][key] = {
                        "from_user": aggregation.from_user,
                        "from_group": aggregation.from_group,
                        "message_count": len(aggregation.messages),
                        "first_message_time": aggregation.first_message_time,
                        "last_message_time": aggregation.last_message_time,
                        "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0
                    }
            return status
    def force_process_aggregation(self, from_user: str, from_group: str) -> bool:
        """
        强制处理指定用户和群组的聚合消息
        Args:
            from_user: 用户ID
            from_group: 群组ID
        Returns:
            是否成功处理
        """
        aggregation_key = self._get_aggregation_key(from_user, from_group)
        with self.lock:
            if aggregation_key not in self.aggregations:
                logger.warning(f"没有找到待聚合的消息: user={from_user}, group={from_group}")
                return False
        logger.info(f"强制处理聚合消息: user={from_user}, group={from_group}")
        self._process_aggregated_messages(aggregation_key)
        return True
    def stop(self):
        """停止聚合服务"""
        logger.info("正在停止消息聚合服务...")
        with self.lock:
            for aggregation_key, aggregation in self.aggregations.items():
                try:
                    with aggregation.lock:
                        if aggregation.timer:
                            aggregation.timer.cancel()
                    logger.debug(f"已取消聚合定时器: {aggregation_key}")
                except Exception as e:
                    logger.error(f"取消聚合定时器异常: {aggregation_key}, error={str(e)}")
            self.aggregations.clear()
        logger.info("消息聚合服务已停止")
# 全局消息聚合服务实例
message_aggregator = MessageAggregatorService()
app/services/message_processor.py
@@ -5,6 +5,7 @@
import json
import time
import re
import xml.etree.ElementTree as ET
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
@@ -27,6 +28,69 @@
    def __init__(self):
        pass
    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
        """
        提取引用消息的内容(message_type为80014)
        Args:
            callback_data: 回调数据
        Returns:
            组合后的消息内容(content + title,中间空一行)
        """
        try:
            data = callback_data.get("data", {})
            content = data.get("content", "")
            title = data.get("title", "")
            # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
            refer_content = ""
            xml_title = ""
            try:
                # 解析XML
                root = ET.fromstring(content)
                # 查找msg>appmsg>refermsg>content路径
                appmsg = root.find("appmsg")
                if appmsg is not None:
                    # 提取XML中的title(如果存在)
                    title_element = appmsg.find("title")
                    if title_element is not None and title_element.text:
                        xml_title = title_element.text.strip()
                    # 提取引用消息内容
                    refermsg = appmsg.find("refermsg")
                    if refermsg is not None:
                        content_element = refermsg.find("content")
                        if content_element is not None and content_element.text:
                            refer_content = content_element.text.strip()
            except ET.ParseError as e:
                logger.warning(f"XML解析失败: {str(e)}, content={content}")
                # 如果XML解析失败,使用原始content
                refer_content = content
            # 确定最终使用的title:优先使用XML中的title,其次使用data.title
            final_title = xml_title if xml_title else title
            # 组合内容:refer_content在前,final_title在后,中间空一行
            if refer_content and final_title:
                combined_content = f"{refer_content}\n\n{final_title}"
            elif refer_content:
                combined_content = refer_content
            elif final_title:
                combined_content = final_title
            else:
                combined_content = content  # 如果都没有,使用原始content
            logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
            return combined_content
        except Exception as e:
            logger.error(f"提取引用消息内容异常: error={str(e)}")
            # 异常情况下返回原始content
            return callback_data.get("data", {}).get("content", "")
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
        """
@@ -83,9 +147,10 @@
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)
        # 检查消息类型是否是群聊消息(80001)或引用消息(80014)
        message_type = callback_data.get("messageType")
        if message_type != "80001":
        print(f"data: {callback_data}")
        if message_type not in ["80001", "80014"]:
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
@@ -150,11 +215,23 @@
        if not self.is_valid_group_message(callback_data):
            return False
        data = callback_data.get("data", {})
        from_user = data.get("fromUser")
        # 导入消息聚合服务(延迟导入避免循环依赖)
        from app.services.message_aggregator import message_aggregator
        # 将整个回调内容转成JSON字符串存储到Redis队列
        return redis_queue.enqueue_message(from_user, callback_data)
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
            from_user = data.get("fromUser")
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}")
            return True
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
@@ -230,8 +307,15 @@
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}")
            # 根据消息类型处理内容
            if message_type == "80014":
                # 引用消息,需要提取XML中的内容并与title组合
                content = self.extract_refer_message_content(message_data)
                logger.info(f"引用消息内容处理完成: content_length={len(content)}")
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db:
config.py
@@ -111,6 +111,11 @@
        self.keyword_filter_enabled = keyword_filter_config.get("enabled", False)
        self.keyword_filter_keywords = keyword_filter_config.get("keywords", [])
        # 消息聚合配置
        message_aggregation_config = config_data.get("message_aggregation", {})
        self.message_aggregation_enabled = message_aggregation_config.get("enabled", True)
        self.message_aggregation_timeout = message_aggregation_config.get("timeout_seconds", 15)
    def update_ecloud_w_id(self, new_w_id: str) -> bool:
        """
        动态更新E云管家的w_id配置
ecloud_dify.spec
@@ -11,7 +11,6 @@
# 添加配置文件
datas.append(('config.example.json', '.'))
datas.append(('config.production.json', '.'))
if os.path.exists('config.json'):
    datas.append(('config.json', '.'))
logs/app.log
@@ -1,19 +1,5 @@
2025-07-28 09:54:56 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 10:41:14 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 10:42:46 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 11:10:52 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 11:23:47 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务
2025-07-28 11:41:55 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务
2025-07-28 11:44:17 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务
2025-07-28 14:08:32 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务
2025-07-28 14:31:14 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务
2025-07-28 16:07:05 | INFO | __main__:<module>:139 - 启动E云管家-DifyAI对接服务
2025-07-28 16:23:36 | INFO | __main__:<module>:152 - 启动E云管家-DifyAI对接服务
2025-07-28 16:39:45 | INFO | __main__:<module>:159 - 启动E云管家-DifyAI对接服务
2025-07-28 17:17:10 | INFO | __main__:<module>:161 - 启动E云管家-DifyAI对接服务
2025-07-28 17:18:18 | INFO | __main__:<module>:161 - 启动E云管家-DifyAI对接服务
2025-07-28 17:29:25 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 17:29:50 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 17:34:00 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 17:44:45 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 18:12:07 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-08-07 16:49:53 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:07:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:09:51 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:48:05 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:51:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
main.py
@@ -11,9 +11,11 @@
from config import settings
from app.api.callback import router as callback_router
from app.api.friend_ignore import router as friend_ignore_router
from app.api.message_aggregation import router as message_aggregation_router
from app.models.database import create_tables
from app.workers.message_worker import message_worker
from app.services.contact_sync import contact_sync_service
from app.services.message_aggregator import message_aggregator
@asynccontextmanager
@@ -57,6 +59,13 @@
    # 关闭时执行
    logger.info("正在关闭应用...")
    # 停止消息聚合服务
    try:
        message_aggregator.stop()
        logger.info("消息聚合服务已停止")
    except Exception as e:
        logger.error(f"消息聚合服务停止失败: {str(e)}")
    # 停止消息工作进程
    try:
        message_worker.stop()
@@ -87,6 +96,7 @@
# 注册路由
app.include_router(callback_router, prefix="/api/v1", tags=["回调接口"])
app.include_router(friend_ignore_router, prefix="/api/v1", tags=["好友忽略管理"])
app.include_router(message_aggregation_router, prefix="/api/v1/aggregation", tags=["消息聚合管理"])
@app.get("/")
startup.py
New file
@@ -0,0 +1,128 @@
"""
启动脚本 - 处理配置文件和目录初始化
"""
import os
import sys
import json
import shutil
from pathlib import Path
def get_exe_dir():
    """获取exe文件所在目录"""
    if getattr(sys, "frozen", False):
        # 如果是打包后的exe文件
        return os.path.dirname(sys.executable)
    else:
        # 如果是开发环境
        return os.path.dirname(os.path.abspath(__file__))
def ensure_directories():
    """确保必要的目录存在"""
    exe_dir = get_exe_dir()
    # 创建日志目录
    logs_dir = os.path.join(exe_dir, "logs")
    if not os.path.exists(logs_dir):
        os.makedirs(logs_dir)
        print(f"创建日志目录: {logs_dir}")
    return exe_dir
def ensure_config_file():
    """确保配置文件存在"""
    exe_dir = get_exe_dir()
    config_file = os.path.join(exe_dir, "config.json")
    if not os.path.exists(config_file):
        # 如果config.json不存在,尝试复制生产配置模板
        production_config = os.path.join(exe_dir, "config.production.json")
        example_config = os.path.join(exe_dir, "config.example.json")
        if os.path.exists(production_config):
            shutil.copy2(production_config, config_file)
            print(f"复制生产配置文件: {production_config} -> {config_file}")
        elif os.path.exists(example_config):
            shutil.copy2(example_config, config_file)
            print(f"复制示例配置文件: {example_config} -> {config_file}")
        else:
            # 创建默认配置文件
            default_config = {
                "database": {
                    "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify"
                },
                "redis": {"url": "redis://localhost:6379/0"},
                "ecloud": {
                    "base_url": "http://125.122.152.142:9899",
                    "authorization": "your_ecloud_authorization_token",
                    "w_id": "your_ecloud_w_id",
                },
                "dify": {
                    "base_url": "https://api.dify.ai/v1",
                    "api_key": "your_dify_api_key",
                },
                "server": {"host": "0.0.0.0", "port": 7979, "debug": False},
                "logging": {"level": "INFO", "file": "logs/app.log"},
                "message_processing": {
                    "max_retry_count": 3,
                    "retry_delay": 5,
                    "queue_timeout": 300,
                },
                "customer_service": {"names": ["客服1", "客服2"]},
            }
            with open(config_file, "w", encoding="utf-8") as f:
                json.dump(default_config, f, indent=2, ensure_ascii=False)
            print(f"创建默认配置文件: {config_file}")
    return config_file
def setup_environment():
    """设置运行环境"""
    exe_dir = ensure_directories()
    config_file = ensure_config_file()
    # 设置工作目录为exe所在目录
    os.chdir(exe_dir)
    print(f"设置工作目录: {exe_dir}")
    return exe_dir, config_file
if __name__ == "__main__":
    setup_environment()
    # 导入并启动主应用
    try:
        from main import app
        import uvicorn
        from config import settings
        from loguru import logger
        # 配置日志
        logger.add(
            settings.log_file,
            rotation="1 day",
            retention="7 days",
            level=settings.log_level,
            format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
        )
        logger.info("启动E云管家-DifyAI对接服务")
        # 启动服务
        uvicorn.run(
            app,
            host=settings.server_host,
            port=settings.server_port,
            log_level=settings.log_level.lower(),
        )
    except Exception as e:
        print(f"启动失败: {e}")
        input("按任意键退出...")
        sys.exit(1)
tests/test_message_aggregator.py
New file
@@ -0,0 +1,405 @@
"""
消息聚合服务测试
"""
import time
import threading
import pytest
from unittest.mock import patch, MagicMock
from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation
class TestMessageAggregatorService:
    """消息聚合服务测试类"""
    def setup_method(self):
        """测试前准备"""
        self.aggregator = MessageAggregatorService()
        # 设置较短的超时时间用于测试
        self.aggregator.aggregation_timeout = 2
        self.aggregator.aggregation_enabled = True
    def teardown_method(self):
        """测试后清理"""
        self.aggregator.stop()
    def test_should_aggregate_message_valid(self):
        """测试有效消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is True
    def test_should_aggregate_message_disabled(self):
        """测试聚合功能禁用时的判断"""
        self.aggregator.aggregation_enabled = False
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    def test_should_aggregate_message_wrong_type(self):
        """测试错误消息类型的聚合判断"""
        message_data = {
            "messageType": "80002",  # 非群聊消息
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    def test_should_aggregate_message_self_message(self):
        """测试自己发送的消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": True  # 自己发送的消息
            }
        }
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    def test_should_aggregate_message_missing_fields(self):
        """测试缺少必要字段的消息的聚合判断"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                # 缺少 fromGroup
                "content": "测试消息",
                "self": False
            }
        }
        result = self.aggregator.should_aggregate_message(message_data)
        assert result is False
    def test_add_message_to_aggregation_no_aggregation(self):
        """测试不需要聚合的消息"""
        self.aggregator.aggregation_enabled = False
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
        assert should_process is True
        assert aggregated_data == message_data
    def test_add_message_to_aggregation_first_message(self):
        """测试添加第一条消息到聚合"""
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第一条消息",
                "self": False
            }
        }
        should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data)
        assert should_process is False
        assert aggregated_data is None
        # 检查聚合状态
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 1
        assert "group123@chatroom:wxid_test123" in status["aggregations"]
    def test_add_message_to_aggregation_multiple_messages(self):
        """测试添加多条消息到聚合"""
        message_data_1 = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第一条消息",
                "self": False
            }
        }
        message_data_2 = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "第二条消息",
                "self": False
            }
        }
        # 添加第一条消息
        should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1)
        assert should_process_1 is False
        # 添加第二条消息
        should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2)
        assert should_process_2 is False
        # 检查聚合状态
        status = self.aggregator.get_aggregation_status()
        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
        assert aggregation_info["message_count"] == 2
    @patch('app.services.message_processor.message_processor')
    def test_process_aggregated_messages_timeout(self, mock_message_processor):
        """测试聚合消息超时处理"""
        mock_message_processor.process_single_message.return_value = True
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        # 添加消息到聚合
        should_process, _ = self.aggregator.add_message_to_aggregation(message_data)
        assert should_process is False
        # 等待超时处理
        time.sleep(2.5)
        # 验证消息处理器被调用
        mock_message_processor.process_single_message.assert_called_once()
        # 检查聚合已被清理
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 0
    def test_get_aggregation_status(self):
        """测试获取聚合状态"""
        # 初始状态
        status = self.aggregator.get_aggregation_status()
        assert status["enabled"] is True
        assert status["timeout"] == 2
        assert status["active_aggregations"] == 0
        assert status["aggregations"] == {}
        # 添加消息后的状态
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        self.aggregator.add_message_to_aggregation(message_data)
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 1
        assert "group123@chatroom:wxid_test123" in status["aggregations"]
        aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"]
        assert aggregation_info["from_user"] == "wxid_test123"
        assert aggregation_info["from_group"] == "group123@chatroom"
        assert aggregation_info["message_count"] == 1
    @patch('app.services.message_processor.message_processor')
    def test_force_process_aggregation(self, mock_message_processor):
        """测试强制处理聚合消息"""
        mock_message_processor.process_single_message.return_value = True
        message_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        # 添加消息到聚合
        self.aggregator.add_message_to_aggregation(message_data)
        # 强制处理聚合
        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
        assert result is True
        # 验证消息处理器被调用
        mock_message_processor.process_single_message.assert_called_once()
        # 检查聚合已被清理
        status = self.aggregator.get_aggregation_status()
        assert status["active_aggregations"] == 0
    def test_force_process_aggregation_not_found(self):
        """测试强制处理不存在的聚合"""
        result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom")
        assert result is False
class TestPendingMessage:
    """待聚合消息测试类"""
    def test_pending_message_creation(self):
        """测试待聚合消息创建"""
        message_data = {"test": "data"}
        timestamp = time.time()
        pending_message = PendingMessage(
            message_data=message_data,
            timestamp=timestamp,
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        assert pending_message.message_data == message_data
        assert pending_message.timestamp == timestamp
        assert pending_message.content == "测试内容"
        assert pending_message.from_user == "wxid_test123"
        assert pending_message.from_group == "group123@chatroom"
class TestMessageAggregation:
    """消息聚合对象测试类"""
    def test_message_aggregation_creation(self):
        """测试消息聚合对象创建"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        assert aggregation.from_user == "wxid_test123"
        assert aggregation.from_group == "group123@chatroom"
        assert len(aggregation.messages) == 0
        assert aggregation.first_message_time == 0.0
        assert aggregation.last_message_time == 0.0
        assert aggregation.timer is None
    def test_add_message(self):
        """测试添加消息到聚合"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        timestamp = time.time()
        pending_message = PendingMessage(
            message_data={"test": "data"},
            timestamp=timestamp,
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        aggregation.add_message(pending_message)
        assert len(aggregation.messages) == 1
        assert aggregation.first_message_time == timestamp
        assert aggregation.last_message_time == timestamp
    def test_get_aggregated_content(self):
        """测试获取聚合后的内容"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        # 添加多条消息
        for i in range(3):
            pending_message = PendingMessage(
                message_data={"test": f"data{i}"},
                timestamp=time.time() + i,
                content=f"消息{i+1}",
                from_user="wxid_test123",
                from_group="group123@chatroom"
            )
            aggregation.add_message(pending_message)
        aggregated_content = aggregation.get_aggregated_content()
        assert aggregated_content == "消息1\n消息2\n消息3"
    def test_get_latest_message_data(self):
        """测试获取最新消息数据"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        # 添加多条消息
        for i in range(3):
            pending_message = PendingMessage(
                message_data={"test": f"data{i}", "timestamp": i},
                timestamp=time.time() + i,
                content=f"消息{i+1}",
                from_user="wxid_test123",
                from_group="group123@chatroom"
            )
            aggregation.add_message(pending_message)
        latest_data = aggregation.get_latest_message_data()
        assert latest_data["test"] == "data2"  # 最后一条消息
        assert latest_data["timestamp"] == 2
    def test_clear(self):
        """测试清空聚合数据"""
        aggregation = MessageAggregation(
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        # 添加消息
        pending_message = PendingMessage(
            message_data={"test": "data"},
            timestamp=time.time(),
            content="测试内容",
            from_user="wxid_test123",
            from_group="group123@chatroom"
        )
        aggregation.add_message(pending_message)
        # 设置定时器
        aggregation.timer = threading.Timer(1, lambda: None)
        # 清空
        aggregation.clear()
        assert len(aggregation.messages) == 0
        assert aggregation.first_message_time == 0.0
        assert aggregation.last_message_time == 0.0
        assert aggregation.timer is None