yj
2025-08-07 2e391d599d08ea7a7c11442bc2845a1191494c3d
app/services/message_processor.py
@@ -5,6 +5,7 @@
import json
import time
import re
import xml.etree.ElementTree as ET
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
@@ -16,6 +17,9 @@
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
from app.services.friend_ignore_service import friend_ignore_service
from app.services.silence_service import silence_service
from app.services.group_stats_service import group_stats_service
from config import settings
@@ -24,6 +28,69 @@
    def __init__(self):
        pass
    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
        """
        提取引用消息的内容(message_type为80014)
        Args:
            callback_data: 回调数据
        Returns:
            组合后的消息内容(content + title,中间空一行)
        """
        try:
            data = callback_data.get("data", {})
            content = data.get("content", "")
            title = data.get("title", "")
            # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
            refer_content = ""
            xml_title = ""
            try:
                # 解析XML
                root = ET.fromstring(content)
                # 查找msg>appmsg>refermsg>content路径
                appmsg = root.find("appmsg")
                if appmsg is not None:
                    # 提取XML中的title(如果存在)
                    title_element = appmsg.find("title")
                    if title_element is not None and title_element.text:
                        xml_title = title_element.text.strip()
                    # 提取引用消息内容
                    refermsg = appmsg.find("refermsg")
                    if refermsg is not None:
                        content_element = refermsg.find("content")
                        if content_element is not None and content_element.text:
                            refer_content = content_element.text.strip()
            except ET.ParseError as e:
                logger.warning(f"XML解析失败: {str(e)}, content={content}")
                # 如果XML解析失败,使用原始content
                refer_content = content
            # 确定最终使用的title:优先使用XML中的title,其次使用data.title
            final_title = xml_title if xml_title else title
            # 组合内容:refer_content在前,final_title在后,中间空一行
            if refer_content and final_title:
                combined_content = f"{refer_content}\n\n{final_title}"
            elif refer_content:
                combined_content = refer_content
            elif final_title:
                combined_content = final_title
            else:
                combined_content = content  # 如果都没有,使用原始content
            logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
            return combined_content
        except Exception as e:
            logger.error(f"提取引用消息内容异常: error={str(e)}")
            # 异常情况下返回原始content
            return callback_data.get("data", {}).get("content", "")
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
        """
@@ -80,9 +147,10 @@
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)
        # 检查消息类型是否是群聊消息(80001)或引用消息(80014)
        message_type = callback_data.get("messageType")
        if message_type != "80001":
        print(f"data: {callback_data}")
        if message_type not in ["80001", "80014"]:
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
@@ -101,6 +169,36 @@
            logger.warning(f"消息缺少必要字段: data={data}")
            return False
        # 获取用户和群组信息
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
        # 检查发送者是否在好友忽略列表中
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user)
        if is_friend_ignored:
            logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}")
            # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
            group_stats_service.increment_user_message_count(from_group, from_user)
            # 激活或延长该群组的静默模式
            if silence_service.is_silence_active(from_group):
                # 如果该群组静默模式已激活,延长时间
                silence_service.extend_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}")
            else:
                # 如果该群组静默模式未激活,激活静默模式
                silence_service.activate_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}")
            return False
        # # 统计正常处理的好友发言次数
        # group_stats_service.increment_user_message_count(from_group, from_user)
        # 检查该群组的静默模式是否激活(在好友忽略检查之后)
        if silence_service.is_silence_active(from_group):
            logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}")
            return False
        return True
    def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool:
@@ -117,11 +215,23 @@
        if not self.is_valid_group_message(callback_data):
            return False
        data = callback_data.get("data", {})
        from_user = data.get("fromUser")
        # 导入消息聚合服务(延迟导入避免循环依赖)
        from app.services.message_aggregator import message_aggregator
        # 将整个回调内容转成JSON字符串存储到Redis队列
        return redis_queue.enqueue_message(from_user, callback_data)
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
            from_user = data.get("fromUser")
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}")
            return True
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
@@ -197,8 +307,15 @@
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}")
            # 根据消息类型处理内容
            if message_type == "80014":
                # 引用消息,需要提取XML中的内容并与title组合
                content = self.extract_refer_message_content(message_data)
                logger.info(f"引用消息内容处理完成: content_length={len(content)}")
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db:
@@ -207,12 +324,19 @@
                    logger.error(f"联系人信息处理失败: from_group={from_group}")
                    return False
                # 3.2 获取用户在当前群组的conversation_id
                # 3.2 获取群组中发言次数最多的用户昵称
                most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group)
                logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}")
                # 3.3 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
                # 调用Dify接口发送消息
                dify_response = dify_client.send_chat_message(
                    query=content, user=from_user, conversation_id=conversation_id
                # 调用Dify接口发送消息(根据配置选择模式)
                dify_response = dify_client.send_message(
                    query=content,
                    user=from_user,
                    conversation_id=conversation_id,
                    nick_name=most_active_nickname
                )
                if not dify_response:
@@ -225,9 +349,9 @@
                # 更新Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group)
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800)
                # 3.3 保存对话记录到数据库
                # 3.4 保存对话记录到数据库
                # 按用户、群组和小时分组对话记录
                current_time = datetime.now()
                hour_key = current_time.strftime("%Y%m%d_%H")