yj
2025-08-07 2e391d599d08ea7a7c11442bc2845a1191494c3d
app/services/message_processor.py
@@ -5,6 +5,7 @@
import json
import time
import re
import xml.etree.ElementTree as ET
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
@@ -27,6 +28,69 @@
    def __init__(self):
        pass
    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
        """
        提取引用消息的内容(message_type为80014)
        Args:
            callback_data: 回调数据
        Returns:
            组合后的消息内容(content + title,中间空一行)
        """
        try:
            data = callback_data.get("data", {})
            content = data.get("content", "")
            title = data.get("title", "")
            # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
            refer_content = ""
            xml_title = ""
            try:
                # 解析XML
                root = ET.fromstring(content)
                # 查找msg>appmsg>refermsg>content路径
                appmsg = root.find("appmsg")
                if appmsg is not None:
                    # 提取XML中的title(如果存在)
                    title_element = appmsg.find("title")
                    if title_element is not None and title_element.text:
                        xml_title = title_element.text.strip()
                    # 提取引用消息内容
                    refermsg = appmsg.find("refermsg")
                    if refermsg is not None:
                        content_element = refermsg.find("content")
                        if content_element is not None and content_element.text:
                            refer_content = content_element.text.strip()
            except ET.ParseError as e:
                logger.warning(f"XML解析失败: {str(e)}, content={content}")
                # 如果XML解析失败,使用原始content
                refer_content = content
            # 确定最终使用的title:优先使用XML中的title,其次使用data.title
            final_title = xml_title if xml_title else title
            # 组合内容:refer_content在前,final_title在后,中间空一行
            if refer_content and final_title:
                combined_content = f"{refer_content}\n\n{final_title}"
            elif refer_content:
                combined_content = refer_content
            elif final_title:
                combined_content = final_title
            else:
                combined_content = content  # 如果都没有,使用原始content
            logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
            return combined_content
        except Exception as e:
            logger.error(f"提取引用消息内容异常: error={str(e)}")
            # 异常情况下返回原始content
            return callback_data.get("data", {}).get("content", "")
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
        """
@@ -83,9 +147,10 @@
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)
        # 检查消息类型是否是群聊消息(80001)或引用消息(80014)
        message_type = callback_data.get("messageType")
        if message_type != "80001":
        print(f"data: {callback_data}")
        if message_type not in ["80001", "80014"]:
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
@@ -150,11 +215,23 @@
        if not self.is_valid_group_message(callback_data):
            return False
        data = callback_data.get("data", {})
        # 导入消息聚合服务(延迟导入避免循环依赖)
        from app.services.message_aggregator import message_aggregator
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
        from_user = data.get("fromUser")
        # 将整个回调内容转成JSON字符串存储到Redis队列
        return redis_queue.enqueue_message(from_user, callback_data)
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}")
            return True
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
@@ -230,8 +307,15 @@
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}")
            # 根据消息类型处理内容
            if message_type == "80014":
                # 引用消息,需要提取XML中的内容并与title组合
                content = self.extract_refer_message_content(message_data)
                logger.info(f"引用消息内容处理完成: content_length={len(content)}")
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db: