新增消息聚会,短时间内同一用户发送的消息聚合才后发送给AI
New file |
| | |
| | | """ |
| | | 消息聚合管理接口 |
| | | """ |
| | | |
| | | from fastapi import APIRouter, HTTPException |
| | | from pydantic import BaseModel |
| | | from typing import Dict, Any, Optional |
| | | from loguru import logger |
| | | |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | |
| | | router = APIRouter() |
| | | |
| | | |
| | | class AggregationStatusResponse(BaseModel): |
| | | """聚合状态响应模型""" |
| | | |
| | | success: bool |
| | | message: str |
| | | data: Dict[str, Any] |
| | | |
| | | |
| | | class ForceProcessRequest(BaseModel): |
| | | """强制处理请求模型""" |
| | | |
| | | from_user: str |
| | | from_group: str |
| | | |
| | | |
| | | class ForceProcessResponse(BaseModel): |
| | | """强制处理响应模型""" |
| | | |
| | | success: bool |
| | | message: str |
| | | processed: bool |
| | | |
| | | |
| | | @router.get("/status", response_model=AggregationStatusResponse) |
| | | async def get_aggregation_status(): |
| | | """ |
| | | 获取消息聚合状态 |
| | | |
| | | Returns: |
| | | 聚合状态信息 |
| | | """ |
| | | try: |
| | | status = message_aggregator.get_aggregation_status() |
| | | |
| | | return AggregationStatusResponse( |
| | | success=True, message="获取聚合状态成功", data=status |
| | | ) |
| | | |
| | | except Exception as e: |
| | | logger.error(f"获取聚合状态异常: {str(e)}") |
| | | raise HTTPException(status_code=500, detail=f"获取聚合状态失败: {str(e)}") |
| | | |
| | | |
| | | @router.post("/force-process", response_model=ForceProcessResponse) |
| | | async def force_process_aggregation(request: ForceProcessRequest): |
| | | """ |
| | | 强制处理指定用户和群组的聚合消息 |
| | | |
| | | Args: |
| | | request: 强制处理请求 |
| | | |
| | | Returns: |
| | | 处理结果 |
| | | """ |
| | | try: |
| | | logger.info( |
| | | f"强制处理聚合消息请求: from_user={request.from_user}, from_group={request.from_group}" |
| | | ) |
| | | |
| | | processed = message_aggregator.force_process_aggregation( |
| | | request.from_user, request.from_group |
| | | ) |
| | | |
| | | if processed: |
| | | return ForceProcessResponse( |
| | | success=True, message="聚合消息处理成功", processed=True |
| | | ) |
| | | else: |
| | | return ForceProcessResponse( |
| | | success=True, message="没有找到待聚合的消息", processed=False |
| | | ) |
| | | |
| | | except Exception as e: |
| | | logger.error( |
| | | f"强制处理聚合消息异常: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}" |
| | | ) |
| | | raise HTTPException(status_code=500, detail=f"强制处理聚合消息失败: {str(e)}") |
| | | |
| | | |
| | | @router.get("/config") |
| | | async def get_aggregation_config(): |
| | | """ |
| | | 获取消息聚合配置 |
| | | |
| | | Returns: |
| | | 聚合配置信息 |
| | | """ |
| | | try: |
| | | config = { |
| | | "enabled": message_aggregator.aggregation_enabled, |
| | | "timeout_seconds": message_aggregator.aggregation_timeout, |
| | | } |
| | | |
| | | return {"success": True, "message": "获取聚合配置成功", "data": config} |
| | | |
| | | except Exception as e: |
| | | logger.error(f"获取聚合配置异常: {str(e)}") |
| | | raise HTTPException(status_code=500, detail=f"获取聚合配置失败: {str(e)}") |
| | | |
| | | |
| | | @router.get("/health") |
| | | async def aggregation_health_check(): |
| | | """ |
| | | 消息聚合服务健康检查 |
| | | |
| | | Returns: |
| | | 健康状态 |
| | | """ |
| | | try: |
| | | status = message_aggregator.get_aggregation_status() |
| | | |
| | | return { |
| | | "success": True, |
| | | "message": "消息聚合服务运行正常", |
| | | "data": { |
| | | "service_status": "healthy", |
| | | "enabled": status["enabled"], |
| | | "active_aggregations": status["active_aggregations"], |
| | | }, |
| | | } |
| | | |
| | | except Exception as e: |
| | | logger.error(f"聚合服务健康检查异常: {str(e)}") |
| | | raise HTTPException(status_code=500, detail=f"聚合服务健康检查失败: {str(e)}") |
| | |
| | | if keyword in filtered_content: |
| | | filtered_content = filtered_content.replace(keyword, "") |
| | | logger.info(f"过滤关键词: {keyword}") |
| | | |
| | | # 去除前后空格 |
| | | filtered_content = filtered_content.strip() |
| | | return filtered_content |
| | | |
| | | def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]: |
New file |
| | |
| | | """ |
| | | 消息聚合服务 |
| | | 实现同一群组中同一用户短时间内多条消息的聚合功能 |
| | | """ |
| | | |
| | | import time |
| | | import threading |
| | | import json |
| | | from typing import Dict, List, Optional, Any, Tuple |
| | | from loguru import logger |
| | | from dataclasses import dataclass, field |
| | | from datetime import datetime, timedelta |
| | | |
| | | from config import settings |
| | | |
| | | |
| | | @dataclass |
| | | class PendingMessage: |
| | | """待聚合的消息""" |
| | | message_data: Dict[str, Any] |
| | | timestamp: float |
| | | content: str |
| | | from_user: str |
| | | from_group: str |
| | | |
| | | |
| | | @dataclass |
| | | class MessageAggregation: |
| | | """消息聚合对象""" |
| | | from_user: str |
| | | from_group: str |
| | | messages: List[PendingMessage] = field(default_factory=list) |
| | | first_message_time: float = 0.0 |
| | | last_message_time: float = 0.0 |
| | | timer: Optional[threading.Timer] = None |
| | | lock: threading.Lock = field(default_factory=threading.Lock) |
| | | |
| | | def add_message(self, message: PendingMessage): |
| | | """添加消息到聚合中""" |
| | | with self.lock: |
| | | self.messages.append(message) |
| | | if not self.first_message_time: |
| | | self.first_message_time = message.timestamp |
| | | self.last_message_time = message.timestamp |
| | | |
| | | def get_aggregated_content(self) -> str: |
| | | """获取聚合后的消息内容""" |
| | | with self.lock: |
| | | if not self.messages: |
| | | return "" |
| | | |
| | | # 按时间戳排序消息 |
| | | sorted_messages = sorted(self.messages, key=lambda x: x.timestamp) |
| | | |
| | | # 合并消息内容 |
| | | contents = [msg.content for msg in sorted_messages] |
| | | return "\n".join(contents) |
| | | |
| | | def get_latest_message_data(self) -> Optional[Dict[str, Any]]: |
| | | """获取最新的消息数据作为模板""" |
| | | with self.lock: |
| | | if not self.messages: |
| | | return None |
| | | |
| | | # 返回最后一条消息的数据作为模板 |
| | | latest_message = max(self.messages, key=lambda x: x.timestamp) |
| | | return latest_message.message_data.copy() |
| | | |
| | | def clear(self): |
| | | """清空聚合数据""" |
| | | with self.lock: |
| | | self.messages.clear() |
| | | self.first_message_time = 0.0 |
| | | self.last_message_time = 0.0 |
| | | if self.timer: |
| | | self.timer.cancel() |
| | | self.timer = None |
| | | |
| | | |
| | | class MessageAggregatorService: |
| | | """消息聚合服务""" |
| | | |
| | | def __init__(self): |
| | | self.aggregations: Dict[str, MessageAggregation] = {} |
| | | self.lock = threading.Lock() |
| | | self.aggregation_timeout = getattr(settings, 'message_aggregation_timeout', 15) # 默认15秒 |
| | | self.aggregation_enabled = getattr(settings, 'message_aggregation_enabled', True) # 默认启用 |
| | | |
| | | logger.info(f"消息聚合服务初始化: timeout={self.aggregation_timeout}s, enabled={self.aggregation_enabled}") |
| | | |
| | | def _get_aggregation_key(self, from_user: str, from_group: str) -> str: |
| | | """获取聚合键""" |
| | | return f"{from_group}:{from_user}" |
| | | |
| | | def should_aggregate_message(self, message_data: Dict[str, Any]) -> bool: |
| | | """ |
| | | 判断消息是否应该被聚合 |
| | | |
| | | Args: |
| | | message_data: 消息数据 |
| | | |
| | | Returns: |
| | | 是否应该聚合 |
| | | """ |
| | | if not self.aggregation_enabled: |
| | | return False |
| | | |
| | | # 检查消息类型是否是群聊消息 |
| | | message_type = message_data.get("messageType") |
| | | if message_type != "80001": |
| | | return False |
| | | |
| | | data = message_data.get("data", {}) |
| | | |
| | | # 检查是否是自己发送的消息 |
| | | if data.get("self", False): |
| | | return False |
| | | |
| | | # 检查必要字段 |
| | | if not data.get("fromUser") or not data.get("fromGroup") or not data.get("content"): |
| | | return False |
| | | |
| | | return True |
| | | |
| | | def add_message_to_aggregation(self, message_data: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]: |
| | | """ |
| | | 将消息添加到聚合中 |
| | | |
| | | Args: |
| | | message_data: 消息数据 |
| | | |
| | | Returns: |
| | | (是否需要立即处理, 聚合后的消息数据) |
| | | - 如果返回 (False, None),表示消息已被聚合,不需要立即处理 |
| | | - 如果返回 (True, aggregated_data),表示聚合超时,需要处理聚合后的消息 |
| | | """ |
| | | if not self.should_aggregate_message(message_data): |
| | | # 不需要聚合,直接处理 |
| | | return True, message_data |
| | | |
| | | data = message_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | from_group = data.get("fromGroup") |
| | | content = data.get("content", "") |
| | | |
| | | aggregation_key = self._get_aggregation_key(from_user, from_group) |
| | | current_time = time.time() |
| | | |
| | | pending_message = PendingMessage( |
| | | message_data=message_data, |
| | | timestamp=current_time, |
| | | content=content, |
| | | from_user=from_user, |
| | | from_group=from_group |
| | | ) |
| | | |
| | | with self.lock: |
| | | if aggregation_key not in self.aggregations: |
| | | # 创建新的聚合 |
| | | self.aggregations[aggregation_key] = MessageAggregation( |
| | | from_user=from_user, |
| | | from_group=from_group |
| | | ) |
| | | |
| | | aggregation = self.aggregations[aggregation_key] |
| | | |
| | | # 取消之前的定时器 |
| | | with aggregation.lock: |
| | | if aggregation.timer: |
| | | aggregation.timer.cancel() |
| | | |
| | | # 添加消息到聚合 |
| | | aggregation.add_message(pending_message) |
| | | |
| | | # 设置新的定时器 |
| | | timer = threading.Timer( |
| | | self.aggregation_timeout, |
| | | self._process_aggregated_messages, |
| | | args=(aggregation_key,) |
| | | ) |
| | | |
| | | with aggregation.lock: |
| | | aggregation.timer = timer |
| | | |
| | | timer.start() |
| | | |
| | | logger.info(f"消息已添加到聚合: user={from_user}, group={from_group}, " |
| | | f"total_messages={len(aggregation.messages)}, timeout={self.aggregation_timeout}s") |
| | | |
| | | # 消息已被聚合,不需要立即处理 |
| | | return False, None |
| | | |
| | | def _process_aggregated_messages(self, aggregation_key: str): |
| | | """ |
| | | 处理聚合的消息(定时器回调) |
| | | |
| | | Args: |
| | | aggregation_key: 聚合键 |
| | | """ |
| | | try: |
| | | with self.lock: |
| | | if aggregation_key not in self.aggregations: |
| | | logger.warning(f"聚合键不存在: {aggregation_key}") |
| | | return |
| | | |
| | | aggregation = self.aggregations[aggregation_key] |
| | | |
| | | # 获取聚合后的消息数据 |
| | | aggregated_content = aggregation.get_aggregated_content() |
| | | latest_message_data = aggregation.get_latest_message_data() |
| | | |
| | | if not aggregated_content or not latest_message_data: |
| | | logger.warning(f"聚合数据为空: {aggregation_key}") |
| | | self._cleanup_aggregation(aggregation_key) |
| | | return |
| | | |
| | | # 创建聚合后的消息数据 |
| | | aggregated_message_data = latest_message_data.copy() |
| | | aggregated_message_data["data"]["content"] = aggregated_content |
| | | |
| | | message_count = len(aggregation.messages) |
| | | logger.info(f"处理聚合消息: user={aggregation.from_user}, group={aggregation.from_group}, " |
| | | f"message_count={message_count}, aggregated_content_length={len(aggregated_content)}") |
| | | |
| | | # 清理聚合数据 |
| | | self._cleanup_aggregation(aggregation_key) |
| | | |
| | | # 调用消息处理器处理聚合后的消息(延迟导入避免循环依赖) |
| | | from app.services.message_processor import message_processor |
| | | success = message_processor.process_single_message(aggregated_message_data) |
| | | |
| | | if success: |
| | | logger.info(f"聚合消息处理成功: user={aggregation.from_user}, group={aggregation.from_group}") |
| | | else: |
| | | logger.error(f"聚合消息处理失败: user={aggregation.from_user}, group={aggregation.from_group}") |
| | | |
| | | except Exception as e: |
| | | logger.error(f"处理聚合消息异常: aggregation_key={aggregation_key}, error={str(e)}") |
| | | self._cleanup_aggregation(aggregation_key) |
| | | |
| | | def _cleanup_aggregation(self, aggregation_key: str): |
| | | """ |
| | | 清理聚合数据 |
| | | |
| | | Args: |
| | | aggregation_key: 聚合键 |
| | | """ |
| | | try: |
| | | with self.lock: |
| | | if aggregation_key in self.aggregations: |
| | | aggregation = self.aggregations[aggregation_key] |
| | | aggregation.clear() |
| | | del self.aggregations[aggregation_key] |
| | | logger.debug(f"聚合数据已清理: {aggregation_key}") |
| | | except Exception as e: |
| | | logger.error(f"清理聚合数据异常: aggregation_key={aggregation_key}, error={str(e)}") |
| | | |
| | | def get_aggregation_status(self) -> Dict[str, Any]: |
| | | """ |
| | | 获取聚合状态信息 |
| | | |
| | | Returns: |
| | | 聚合状态信息 |
| | | """ |
| | | with self.lock: |
| | | status = { |
| | | "enabled": self.aggregation_enabled, |
| | | "timeout": self.aggregation_timeout, |
| | | "active_aggregations": len(self.aggregations), |
| | | "aggregations": {} |
| | | } |
| | | |
| | | for key, aggregation in self.aggregations.items(): |
| | | with aggregation.lock: |
| | | status["aggregations"][key] = { |
| | | "from_user": aggregation.from_user, |
| | | "from_group": aggregation.from_group, |
| | | "message_count": len(aggregation.messages), |
| | | "first_message_time": aggregation.first_message_time, |
| | | "last_message_time": aggregation.last_message_time, |
| | | "elapsed_time": time.time() - aggregation.first_message_time if aggregation.first_message_time else 0 |
| | | } |
| | | |
| | | return status |
| | | |
| | | def force_process_aggregation(self, from_user: str, from_group: str) -> bool: |
| | | """ |
| | | 强制处理指定用户和群组的聚合消息 |
| | | |
| | | Args: |
| | | from_user: 用户ID |
| | | from_group: 群组ID |
| | | |
| | | Returns: |
| | | 是否成功处理 |
| | | """ |
| | | aggregation_key = self._get_aggregation_key(from_user, from_group) |
| | | |
| | | with self.lock: |
| | | if aggregation_key not in self.aggregations: |
| | | logger.warning(f"没有找到待聚合的消息: user={from_user}, group={from_group}") |
| | | return False |
| | | |
| | | logger.info(f"强制处理聚合消息: user={from_user}, group={from_group}") |
| | | self._process_aggregated_messages(aggregation_key) |
| | | return True |
| | | |
| | | def stop(self): |
| | | """停止聚合服务""" |
| | | logger.info("正在停止消息聚合服务...") |
| | | |
| | | with self.lock: |
| | | for aggregation_key, aggregation in self.aggregations.items(): |
| | | try: |
| | | with aggregation.lock: |
| | | if aggregation.timer: |
| | | aggregation.timer.cancel() |
| | | logger.debug(f"已取消聚合定时器: {aggregation_key}") |
| | | except Exception as e: |
| | | logger.error(f"取消聚合定时器异常: {aggregation_key}, error={str(e)}") |
| | | |
| | | self.aggregations.clear() |
| | | |
| | | logger.info("消息聚合服务已停止") |
| | | |
| | | |
| | | # 全局消息聚合服务实例 |
| | | message_aggregator = MessageAggregatorService() |
| | |
| | | import json |
| | | import time |
| | | import re |
| | | import xml.etree.ElementTree as ET |
| | | from typing import Dict, Any, Optional, List, Tuple |
| | | from datetime import datetime |
| | | from sqlalchemy.orm import Session |
| | |
| | | |
| | | def __init__(self): |
| | | pass |
| | | |
| | | def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: |
| | | """ |
| | | 提取引用消息的内容(message_type为80014) |
| | | |
| | | Args: |
| | | callback_data: 回调数据 |
| | | |
| | | Returns: |
| | | 组合后的消息内容(content + title,中间空一行) |
| | | """ |
| | | try: |
| | | data = callback_data.get("data", {}) |
| | | content = data.get("content", "") |
| | | title = data.get("title", "") |
| | | |
| | | # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容 |
| | | refer_content = "" |
| | | xml_title = "" |
| | | try: |
| | | # 解析XML |
| | | root = ET.fromstring(content) |
| | | |
| | | # 查找msg>appmsg>refermsg>content路径 |
| | | appmsg = root.find("appmsg") |
| | | if appmsg is not None: |
| | | # 提取XML中的title(如果存在) |
| | | title_element = appmsg.find("title") |
| | | if title_element is not None and title_element.text: |
| | | xml_title = title_element.text.strip() |
| | | |
| | | # 提取引用消息内容 |
| | | refermsg = appmsg.find("refermsg") |
| | | if refermsg is not None: |
| | | content_element = refermsg.find("content") |
| | | if content_element is not None and content_element.text: |
| | | refer_content = content_element.text.strip() |
| | | |
| | | except ET.ParseError as e: |
| | | logger.warning(f"XML解析失败: {str(e)}, content={content}") |
| | | # 如果XML解析失败,使用原始content |
| | | refer_content = content |
| | | |
| | | # 确定最终使用的title:优先使用XML中的title,其次使用data.title |
| | | final_title = xml_title if xml_title else title |
| | | |
| | | # 组合内容:refer_content在前,final_title在后,中间空一行 |
| | | if refer_content and final_title: |
| | | combined_content = f"{refer_content}\n\n{final_title}" |
| | | elif refer_content: |
| | | combined_content = refer_content |
| | | elif final_title: |
| | | combined_content = final_title |
| | | else: |
| | | combined_content = content # 如果都没有,使用原始content |
| | | |
| | | logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") |
| | | return combined_content |
| | | |
| | | except Exception as e: |
| | | logger.error(f"提取引用消息内容异常: error={str(e)}") |
| | | # 异常情况下返回原始content |
| | | return callback_data.get("data", {}).get("content", "") |
| | | |
| | | def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: |
| | | """ |
| | |
| | | Returns: |
| | | 是否是有效的群聊消息 |
| | | """ |
| | | # 检查消息类型是否是群聊消息(80001) |
| | | # 检查消息类型是否是群聊消息(80001)或引用消息(80014) |
| | | message_type = callback_data.get("messageType") |
| | | if message_type != "80001": |
| | | print(f"data: {callback_data}") |
| | | if message_type not in ["80001", "80014"]: |
| | | logger.info(f"忽略非群聊消息: messageType={message_type}") |
| | | return False |
| | | |
| | |
| | | if not self.is_valid_group_message(callback_data): |
| | | return False |
| | | |
| | | data = callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | # 导入消息聚合服务(延迟导入避免循环依赖) |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | # 将整个回调内容转成JSON字符串存储到Redis队列 |
| | | return redis_queue.enqueue_message(from_user, callback_data) |
| | | # 尝试将消息添加到聚合中 |
| | | should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) |
| | | |
| | | if should_process_immediately: |
| | | # 需要立即处理(不聚合或聚合超时) |
| | | data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | |
| | | # 将消息加入Redis队列 |
| | | return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) |
| | | else: |
| | | # 消息已被聚合,不需要立即处理 |
| | | logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}") |
| | | return True |
| | | |
| | | def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: |
| | | """ |
| | |
| | | from_group = data.get("fromGroup") |
| | | content = data.get("content") |
| | | w_id = data.get("wId") |
| | | message_type = message_data.get("messageType") |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}") |
| | | |
| | | # 根据消息类型处理内容 |
| | | if message_type == "80014": |
| | | # 引用消息,需要提取XML中的内容并与title组合 |
| | | content = self.extract_refer_message_content(message_data) |
| | | logger.info(f"引用消息内容处理完成: content_length={len(content)}") |
| | | |
| | | # 使用上下文管理器确保数据库会话正确管理 |
| | | with next(get_db()) as db: |
| | |
| | | self.keyword_filter_enabled = keyword_filter_config.get("enabled", False) |
| | | self.keyword_filter_keywords = keyword_filter_config.get("keywords", []) |
| | | |
| | | # 消息聚合配置 |
| | | message_aggregation_config = config_data.get("message_aggregation", {}) |
| | | self.message_aggregation_enabled = message_aggregation_config.get("enabled", True) |
| | | self.message_aggregation_timeout = message_aggregation_config.get("timeout_seconds", 15) |
| | | |
| | | def update_ecloud_w_id(self, new_w_id: str) -> bool: |
| | | """ |
| | | 动态更新E云管家的w_id配置 |
| | |
| | | |
| | | # 添加配置文件 |
| | | datas.append(('config.example.json', '.')) |
| | | datas.append(('config.production.json', '.')) |
| | | if os.path.exists('config.json'): |
| | | datas.append(('config.json', '.')) |
| | | |
| | |
| | | 2025-07-28 09:54:56 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 10:41:14 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 10:42:46 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 11:10:52 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 11:23:47 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 11:41:55 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 11:44:17 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 14:08:32 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 14:31:14 | INFO | __main__:<module>:124 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 16:07:05 | INFO | __main__:<module>:139 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 16:23:36 | INFO | __main__:<module>:152 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 16:39:45 | INFO | __main__:<module>:159 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:17:10 | INFO | __main__:<module>:161 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:18:18 | INFO | __main__:<module>:161 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:29:25 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:29:50 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:34:00 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 17:44:45 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-07-28 18:12:07 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务 |
| | | 2025-08-07 16:49:53 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务 |
| | | 2025-08-07 17:07:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务 |
| | | 2025-08-07 17:09:51 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务 |
| | | 2025-08-07 17:48:05 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务 |
| | | 2025-08-07 17:51:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务 |
| | |
| | | from config import settings |
| | | from app.api.callback import router as callback_router |
| | | from app.api.friend_ignore import router as friend_ignore_router |
| | | from app.api.message_aggregation import router as message_aggregation_router |
| | | from app.models.database import create_tables |
| | | from app.workers.message_worker import message_worker |
| | | from app.services.contact_sync import contact_sync_service |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | |
| | | @asynccontextmanager |
| | |
| | | # 关闭时执行 |
| | | logger.info("正在关闭应用...") |
| | | |
| | | # 停止消息聚合服务 |
| | | try: |
| | | message_aggregator.stop() |
| | | logger.info("消息聚合服务已停止") |
| | | except Exception as e: |
| | | logger.error(f"消息聚合服务停止失败: {str(e)}") |
| | | |
| | | # 停止消息工作进程 |
| | | try: |
| | | message_worker.stop() |
| | |
| | | # 注册路由 |
| | | app.include_router(callback_router, prefix="/api/v1", tags=["回调接口"]) |
| | | app.include_router(friend_ignore_router, prefix="/api/v1", tags=["好友忽略管理"]) |
| | | app.include_router(message_aggregation_router, prefix="/api/v1/aggregation", tags=["消息聚合管理"]) |
| | | |
| | | |
| | | @app.get("/") |
New file |
| | |
| | | """ |
| | | 启动脚本 - 处理配置文件和目录初始化 |
| | | """ |
| | | |
| | | import os |
| | | import sys |
| | | import json |
| | | import shutil |
| | | from pathlib import Path |
| | | |
| | | |
| | | def get_exe_dir(): |
| | | """获取exe文件所在目录""" |
| | | if getattr(sys, "frozen", False): |
| | | # 如果是打包后的exe文件 |
| | | return os.path.dirname(sys.executable) |
| | | else: |
| | | # 如果是开发环境 |
| | | return os.path.dirname(os.path.abspath(__file__)) |
| | | |
| | | |
| | | def ensure_directories(): |
| | | """确保必要的目录存在""" |
| | | exe_dir = get_exe_dir() |
| | | |
| | | # 创建日志目录 |
| | | logs_dir = os.path.join(exe_dir, "logs") |
| | | if not os.path.exists(logs_dir): |
| | | os.makedirs(logs_dir) |
| | | print(f"创建日志目录: {logs_dir}") |
| | | |
| | | return exe_dir |
| | | |
| | | |
| | | def ensure_config_file(): |
| | | """确保配置文件存在""" |
| | | exe_dir = get_exe_dir() |
| | | config_file = os.path.join(exe_dir, "config.json") |
| | | |
| | | if not os.path.exists(config_file): |
| | | # 如果config.json不存在,尝试复制生产配置模板 |
| | | production_config = os.path.join(exe_dir, "config.production.json") |
| | | example_config = os.path.join(exe_dir, "config.example.json") |
| | | |
| | | if os.path.exists(production_config): |
| | | shutil.copy2(production_config, config_file) |
| | | print(f"复制生产配置文件: {production_config} -> {config_file}") |
| | | elif os.path.exists(example_config): |
| | | shutil.copy2(example_config, config_file) |
| | | print(f"复制示例配置文件: {example_config} -> {config_file}") |
| | | else: |
| | | # 创建默认配置文件 |
| | | default_config = { |
| | | "database": { |
| | | "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify" |
| | | }, |
| | | "redis": {"url": "redis://localhost:6379/0"}, |
| | | "ecloud": { |
| | | "base_url": "http://125.122.152.142:9899", |
| | | "authorization": "your_ecloud_authorization_token", |
| | | "w_id": "your_ecloud_w_id", |
| | | }, |
| | | "dify": { |
| | | "base_url": "https://api.dify.ai/v1", |
| | | "api_key": "your_dify_api_key", |
| | | }, |
| | | "server": {"host": "0.0.0.0", "port": 7979, "debug": False}, |
| | | "logging": {"level": "INFO", "file": "logs/app.log"}, |
| | | "message_processing": { |
| | | "max_retry_count": 3, |
| | | "retry_delay": 5, |
| | | "queue_timeout": 300, |
| | | }, |
| | | "customer_service": {"names": ["客服1", "客服2"]}, |
| | | } |
| | | |
| | | with open(config_file, "w", encoding="utf-8") as f: |
| | | json.dump(default_config, f, indent=2, ensure_ascii=False) |
| | | print(f"创建默认配置文件: {config_file}") |
| | | |
| | | return config_file |
| | | |
| | | |
| | | def setup_environment(): |
| | | """设置运行环境""" |
| | | exe_dir = ensure_directories() |
| | | config_file = ensure_config_file() |
| | | |
| | | # 设置工作目录为exe所在目录 |
| | | os.chdir(exe_dir) |
| | | print(f"设置工作目录: {exe_dir}") |
| | | |
| | | return exe_dir, config_file |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | setup_environment() |
| | | |
| | | # 导入并启动主应用 |
| | | try: |
| | | from main import app |
| | | import uvicorn |
| | | from config import settings |
| | | from loguru import logger |
| | | |
| | | # 配置日志 |
| | | logger.add( |
| | | settings.log_file, |
| | | rotation="1 day", |
| | | retention="7 days", |
| | | level=settings.log_level, |
| | | format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}", |
| | | ) |
| | | |
| | | logger.info("启动E云管家-DifyAI对接服务") |
| | | |
| | | # 启动服务 |
| | | uvicorn.run( |
| | | app, |
| | | host=settings.server_host, |
| | | port=settings.server_port, |
| | | log_level=settings.log_level.lower(), |
| | | ) |
| | | |
| | | except Exception as e: |
| | | print(f"启动失败: {e}") |
| | | input("按任意键退出...") |
| | | sys.exit(1) |
New file |
| | |
| | | """ |
| | | 消息聚合服务测试 |
| | | """ |
| | | |
| | | import time |
| | | import threading |
| | | import pytest |
| | | from unittest.mock import patch, MagicMock |
| | | from app.services.message_aggregator import MessageAggregatorService, PendingMessage, MessageAggregation |
| | | |
| | | |
| | | class TestMessageAggregatorService: |
| | | """消息聚合服务测试类""" |
| | | |
| | | def setup_method(self): |
| | | """测试前准备""" |
| | | self.aggregator = MessageAggregatorService() |
| | | # 设置较短的超时时间用于测试 |
| | | self.aggregator.aggregation_timeout = 2 |
| | | self.aggregator.aggregation_enabled = True |
| | | |
| | | def teardown_method(self): |
| | | """测试后清理""" |
| | | self.aggregator.stop() |
| | | |
| | | def test_should_aggregate_message_valid(self): |
| | | """测试有效消息的聚合判断""" |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | result = self.aggregator.should_aggregate_message(message_data) |
| | | assert result is True |
| | | |
| | | def test_should_aggregate_message_disabled(self): |
| | | """测试聚合功能禁用时的判断""" |
| | | self.aggregator.aggregation_enabled = False |
| | | |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | result = self.aggregator.should_aggregate_message(message_data) |
| | | assert result is False |
| | | |
| | | def test_should_aggregate_message_wrong_type(self): |
| | | """测试错误消息类型的聚合判断""" |
| | | message_data = { |
| | | "messageType": "80002", # 非群聊消息 |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | result = self.aggregator.should_aggregate_message(message_data) |
| | | assert result is False |
| | | |
| | | def test_should_aggregate_message_self_message(self): |
| | | """测试自己发送的消息的聚合判断""" |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": True # 自己发送的消息 |
| | | } |
| | | } |
| | | |
| | | result = self.aggregator.should_aggregate_message(message_data) |
| | | assert result is False |
| | | |
| | | def test_should_aggregate_message_missing_fields(self): |
| | | """测试缺少必要字段的消息的聚合判断""" |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | # 缺少 fromGroup |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | result = self.aggregator.should_aggregate_message(message_data) |
| | | assert result is False |
| | | |
| | | def test_add_message_to_aggregation_no_aggregation(self): |
| | | """测试不需要聚合的消息""" |
| | | self.aggregator.aggregation_enabled = False |
| | | |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) |
| | | |
| | | assert should_process is True |
| | | assert aggregated_data == message_data |
| | | |
| | | def test_add_message_to_aggregation_first_message(self): |
| | | """测试添加第一条消息到聚合""" |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "第一条消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | should_process, aggregated_data = self.aggregator.add_message_to_aggregation(message_data) |
| | | |
| | | assert should_process is False |
| | | assert aggregated_data is None |
| | | |
| | | # 检查聚合状态 |
| | | status = self.aggregator.get_aggregation_status() |
| | | assert status["active_aggregations"] == 1 |
| | | assert "group123@chatroom:wxid_test123" in status["aggregations"] |
| | | |
| | | def test_add_message_to_aggregation_multiple_messages(self): |
| | | """测试添加多条消息到聚合""" |
| | | message_data_1 = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "第一条消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | message_data_2 = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "第二条消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | # 添加第一条消息 |
| | | should_process_1, _ = self.aggregator.add_message_to_aggregation(message_data_1) |
| | | assert should_process_1 is False |
| | | |
| | | # 添加第二条消息 |
| | | should_process_2, _ = self.aggregator.add_message_to_aggregation(message_data_2) |
| | | assert should_process_2 is False |
| | | |
| | | # 检查聚合状态 |
| | | status = self.aggregator.get_aggregation_status() |
| | | aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] |
| | | assert aggregation_info["message_count"] == 2 |
| | | |
| | | @patch('app.services.message_processor.message_processor') |
| | | def test_process_aggregated_messages_timeout(self, mock_message_processor): |
| | | """测试聚合消息超时处理""" |
| | | mock_message_processor.process_single_message.return_value = True |
| | | |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | # 添加消息到聚合 |
| | | should_process, _ = self.aggregator.add_message_to_aggregation(message_data) |
| | | assert should_process is False |
| | | |
| | | # 等待超时处理 |
| | | time.sleep(2.5) |
| | | |
| | | # 验证消息处理器被调用 |
| | | mock_message_processor.process_single_message.assert_called_once() |
| | | |
| | | # 检查聚合已被清理 |
| | | status = self.aggregator.get_aggregation_status() |
| | | assert status["active_aggregations"] == 0 |
| | | |
| | | def test_get_aggregation_status(self): |
| | | """测试获取聚合状态""" |
| | | # 初始状态 |
| | | status = self.aggregator.get_aggregation_status() |
| | | assert status["enabled"] is True |
| | | assert status["timeout"] == 2 |
| | | assert status["active_aggregations"] == 0 |
| | | assert status["aggregations"] == {} |
| | | |
| | | # 添加消息后的状态 |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | self.aggregator.add_message_to_aggregation(message_data) |
| | | |
| | | status = self.aggregator.get_aggregation_status() |
| | | assert status["active_aggregations"] == 1 |
| | | assert "group123@chatroom:wxid_test123" in status["aggregations"] |
| | | |
| | | aggregation_info = status["aggregations"]["group123@chatroom:wxid_test123"] |
| | | assert aggregation_info["from_user"] == "wxid_test123" |
| | | assert aggregation_info["from_group"] == "group123@chatroom" |
| | | assert aggregation_info["message_count"] == 1 |
| | | |
| | | @patch('app.services.message_processor.message_processor') |
| | | def test_force_process_aggregation(self, mock_message_processor): |
| | | """测试强制处理聚合消息""" |
| | | mock_message_processor.process_single_message.return_value = True |
| | | |
| | | message_data = { |
| | | "messageType": "80001", |
| | | "data": { |
| | | "fromUser": "wxid_test123", |
| | | "fromGroup": "group123@chatroom", |
| | | "content": "测试消息", |
| | | "self": False |
| | | } |
| | | } |
| | | |
| | | # 添加消息到聚合 |
| | | self.aggregator.add_message_to_aggregation(message_data) |
| | | |
| | | # 强制处理聚合 |
| | | result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") |
| | | assert result is True |
| | | |
| | | # 验证消息处理器被调用 |
| | | mock_message_processor.process_single_message.assert_called_once() |
| | | |
| | | # 检查聚合已被清理 |
| | | status = self.aggregator.get_aggregation_status() |
| | | assert status["active_aggregations"] == 0 |
| | | |
| | | def test_force_process_aggregation_not_found(self): |
| | | """测试强制处理不存在的聚合""" |
| | | result = self.aggregator.force_process_aggregation("wxid_test123", "group123@chatroom") |
| | | assert result is False |
| | | |
| | | |
| | | class TestPendingMessage: |
| | | """待聚合消息测试类""" |
| | | |
| | | def test_pending_message_creation(self): |
| | | """测试待聚合消息创建""" |
| | | message_data = {"test": "data"} |
| | | timestamp = time.time() |
| | | |
| | | pending_message = PendingMessage( |
| | | message_data=message_data, |
| | | timestamp=timestamp, |
| | | content="测试内容", |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | assert pending_message.message_data == message_data |
| | | assert pending_message.timestamp == timestamp |
| | | assert pending_message.content == "测试内容" |
| | | assert pending_message.from_user == "wxid_test123" |
| | | assert pending_message.from_group == "group123@chatroom" |
| | | |
| | | |
| | | class TestMessageAggregation: |
| | | """消息聚合对象测试类""" |
| | | |
| | | def test_message_aggregation_creation(self): |
| | | """测试消息聚合对象创建""" |
| | | aggregation = MessageAggregation( |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | assert aggregation.from_user == "wxid_test123" |
| | | assert aggregation.from_group == "group123@chatroom" |
| | | assert len(aggregation.messages) == 0 |
| | | assert aggregation.first_message_time == 0.0 |
| | | assert aggregation.last_message_time == 0.0 |
| | | assert aggregation.timer is None |
| | | |
| | | def test_add_message(self): |
| | | """测试添加消息到聚合""" |
| | | aggregation = MessageAggregation( |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | timestamp = time.time() |
| | | pending_message = PendingMessage( |
| | | message_data={"test": "data"}, |
| | | timestamp=timestamp, |
| | | content="测试内容", |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | aggregation.add_message(pending_message) |
| | | |
| | | assert len(aggregation.messages) == 1 |
| | | assert aggregation.first_message_time == timestamp |
| | | assert aggregation.last_message_time == timestamp |
| | | |
| | | def test_get_aggregated_content(self): |
| | | """测试获取聚合后的内容""" |
| | | aggregation = MessageAggregation( |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | # 添加多条消息 |
| | | for i in range(3): |
| | | pending_message = PendingMessage( |
| | | message_data={"test": f"data{i}"}, |
| | | timestamp=time.time() + i, |
| | | content=f"消息{i+1}", |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | aggregation.add_message(pending_message) |
| | | |
| | | aggregated_content = aggregation.get_aggregated_content() |
| | | assert aggregated_content == "消息1\n消息2\n消息3" |
| | | |
| | | def test_get_latest_message_data(self): |
| | | """测试获取最新消息数据""" |
| | | aggregation = MessageAggregation( |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | # 添加多条消息 |
| | | for i in range(3): |
| | | pending_message = PendingMessage( |
| | | message_data={"test": f"data{i}", "timestamp": i}, |
| | | timestamp=time.time() + i, |
| | | content=f"消息{i+1}", |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | aggregation.add_message(pending_message) |
| | | |
| | | latest_data = aggregation.get_latest_message_data() |
| | | assert latest_data["test"] == "data2" # 最后一条消息 |
| | | assert latest_data["timestamp"] == 2 |
| | | |
| | | def test_clear(self): |
| | | """测试清空聚合数据""" |
| | | aggregation = MessageAggregation( |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | |
| | | # 添加消息 |
| | | pending_message = PendingMessage( |
| | | message_data={"test": "data"}, |
| | | timestamp=time.time(), |
| | | content="测试内容", |
| | | from_user="wxid_test123", |
| | | from_group="group123@chatroom" |
| | | ) |
| | | aggregation.add_message(pending_message) |
| | | |
| | | # 设置定时器 |
| | | aggregation.timer = threading.Timer(1, lambda: None) |
| | | |
| | | # 清空 |
| | | aggregation.clear() |
| | | |
| | | assert len(aggregation.messages) == 0 |
| | | assert aggregation.first_message_time == 0.0 |
| | | assert aggregation.last_message_time == 0.0 |
| | | assert aggregation.timer is None |