yj
2025-09-03 f28ac0166536a2a4b68cac685a41ea667f60f7e9
app/services/message_processor.py
@@ -5,6 +5,7 @@
import json
import time
import re
import xml.etree.ElementTree as ET
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
@@ -28,7 +29,72 @@
    def __init__(self):
        pass
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
        """
        提取引用消息的内容(message_type为80014)
        Args:
            callback_data: 回调数据
        Returns:
            组合后的消息内容(content + title,中间空一行)
        """
        try:
            data = callback_data.get("data", {})
            content = data.get("content", "")
            title = data.get("title", "")
            # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
            refer_content = ""
            xml_title = ""
            try:
                # 解析XML
                root = ET.fromstring(content)
                # 查找msg>appmsg>refermsg>content路径
                appmsg = root.find("appmsg")
                if appmsg is not None:
                    # 提取XML中的title(如果存在)
                    title_element = appmsg.find("title")
                    if title_element is not None and title_element.text:
                        xml_title = title_element.text.strip()
                    # 提取引用消息内容
                    refermsg = appmsg.find("refermsg")
                    if refermsg is not None:
                        content_element = refermsg.find("content")
                        if content_element is not None and content_element.text:
                            refer_content = content_element.text.strip()
            except ET.ParseError as e:
                logger.warning(f"XML解析失败: {str(e)}, content={content}")
                # 如果XML解析失败,使用原始content
                refer_content = content
            # 确定最终使用的title:优先使用XML中的title,其次使用data.title
            final_title = xml_title if xml_title else title
            # 组合内容:refer_content在前,final_title在后,中间空一行
            if refer_content and final_title:
                combined_content = f"{refer_content}\n\n{final_title}"
            elif refer_content:
                combined_content = refer_content
            elif final_title:
                combined_content = final_title
            else:
                combined_content = content  # 如果都没有,使用原始content
            logger.info(
                f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}"
            )
            return combined_content
        except Exception as e:
            logger.error(f"提取引用消息内容异常: error={str(e)}")
            # 异常情况下返回原始content
            return callback_data.get("data", {}).get("content", "")
    def parse_at_mentions(self, ai_answer: str, from_user: str) -> Tuple[str, List[str]]:
        """
        解析AI回复中的@字符,提取客服名称
@@ -43,7 +109,7 @@
            customer_service_names = settings.customer_service_names
            # 查找所有@字符后的客服名称
            at_pattern = r'@([^\s]+)'
            at_pattern = r"@([^\s]+)"
            matches = re.findall(at_pattern, ai_answer)
            valid_at_names = []
@@ -60,10 +126,21 @@
                with next(get_db()) as db:
                    for name in valid_at_names:
                        # 根据nick_name查找联系人
                        contact = db.query(Contact).filter(Contact.nick_name == name).first()
                        contact = (
                            db.query(Contact).filter(Contact.nick_name == name).first()
                        )
                        if contact:
                            at_wc_ids.append(contact.wc_id)
                            logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}")
                            # 如果from_user包含@openim表示是企业微信,使用work_wc_id
                            if "@openim" in from_user:
                                at_wc_ids.append(contact.work_wc_id)
                                logger.info(
                                    f"找到客服联系人-微信: name={name}, wc_id={contact.work_wc_id}"
                                )
                            else:
                                at_wc_ids.append(contact.wc_id)
                                logger.info(
                                    f"找到客服联系人-企业微信: name={name}, wc_id={contact.wc_id}"
                                )
                        else:
                            logger.warning(f"未找到客服联系人: name={name}")
@@ -72,6 +149,28 @@
        except Exception as e:
            logger.error(f"解析@字符异常: error={str(e)}")
            return ai_answer, []
    def is_end_str(self, ai_answer: str) -> bool:
        """
        解析AI回复判断是否是结束字符串
        Args:
            ai_answer: AI回复内容
        Returns:
        """
        try:
            # 获取配置的结束字符串列表
            end_str_list = settings.end_str_list
            for end_str in end_str_list:
                substrings = end_str.split(',')
                # 检查 match_str 是否包含每一个子字符串
                if all(sub in ai_answer for sub in substrings):
                    return True
            return False
        except Exception as e:
            logger.error(f"解析结束字符串异常: error={str(e)}")
            return False
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
@@ -83,9 +182,10 @@
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)
        # 检查消息类型是否是群聊消息(80001)或引用消息(80014)
        message_type = callback_data.get("messageType")
        if message_type != "80001":
        print(f"data: {callback_data}")
        if message_type not in ["80001", "80014"]:
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
@@ -108,22 +208,28 @@
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
        # 检查发送者是否在好友忽略列表中
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user)
        # 检查发送者是否在好友忽略列表中(传入群组ID用于测试群组检查)
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user, from_group)
        if is_friend_ignored:
            logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}")
            logger.info(
                f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}"
            )
            # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
            group_stats_service.increment_user_message_count(from_group, from_user)
            # 激活或延长该群组的静默模式
            if silence_service.is_silence_active(from_group):
                # 如果该群组静默模式已激活,延长时间
                silence_service.extend_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}")
                logger.info(
                    f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                )
            else:
                # 如果该群组静默模式未激活,激活静默模式
                silence_service.activate_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}")
                logger.info(
                    f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                )
            return False
        # # 统计正常处理的好友发言次数
@@ -150,11 +256,33 @@
        if not self.is_valid_group_message(callback_data):
            return False
        data = callback_data.get("data", {})
        from_user = data.get("fromUser")
        # 导入消息聚合服务(延迟导入避免循环依赖)
        from app.services.message_aggregator import message_aggregator
        # 将整个回调内容转成JSON字符串存储到Redis队列
        return redis_queue.enqueue_message(from_user, callback_data)
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = (
            message_aggregator.add_message_to_aggregation(callback_data)
        )
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = (
                aggregated_data.get("data", {})
                if aggregated_data
                else callback_data.get("data", {})
            )
            from_user = data.get("fromUser")
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(
                from_user, aggregated_data or callback_data
            )
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(
                f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}"
            )
            return True
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
@@ -230,8 +358,17 @@
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            logger.info(
                f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}"
            )
            # 根据消息类型处理内容
            if message_type == "80014":
                # 引用消息,需要提取XML中的内容并与title组合
                content = self.extract_refer_message_content(message_data)
                logger.info(f"引用消息内容处理完成: content_length={len(content)}")
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db:
@@ -241,8 +378,14 @@
                    return False
                # 3.2 获取群组中发言次数最多的用户昵称
                most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group)
                logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}")
                # most_active_nickname = (
                #     group_stats_service.get_most_active_user_nickname(from_group)
                # )
                # logger.info(
                #     f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}"
                # )
                # 获取默认客服名称
                nickname = settings.customer_service_default_name
                # 3.3 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
@@ -252,8 +395,13 @@
                    query=content,
                    user=from_user,
                    conversation_id=conversation_id,
                    nick_name=most_active_nickname
                    nick_name=nickname,
                )
                if silence_service.is_silence_active(from_group):
                    # 回复前判断是否激活静默,已静默则不回复
                    logger.error(f"Dify已响应但群组已静默:from_user={from_user}")
                    return False
                if not dify_response:
                    logger.error(f"Dify响应失败: from_user={from_user}")
@@ -265,7 +413,9 @@
                # 更新Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800)
                    redis_queue.set_conversation_id(
                        from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60
                    )
                # 3.4 保存对话记录到数据库
                # 按用户、群组和小时分组对话记录
@@ -279,7 +429,7 @@
                        Conversation.from_user == from_user,
                        Conversation.conversation_id == new_conversation_id,
                        Conversation.group == from_group,
                        Conversation.hour == hour_key
                        Conversation.hour == hour_key,
                    )
                    .first()
                )
@@ -294,20 +444,23 @@
                            content_list = []
                        # 追加新的对话内容
                        content_list.append({
                            "user": content,
                            "ai": ai_answer
                        })
                        content_list.append({"user": content, "ai": ai_answer})
                        # 更新记录
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                        logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}")
                        logger.info(
                            f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}"
                        )
                    except json.JSONDecodeError as e:
                        logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建")
                        # 如果JSON解析失败,重新创建content
                        content_list = [{"user": content, "ai": ai_answer}]
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                else:
                    # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容
@@ -321,7 +474,9 @@
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1")
                    logger.info(
                        f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1"
                    )
                db.commit()
@@ -329,8 +484,9 @@
                success = False
                if ai_answer:
                    # 解析AI回复中的@字符
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer, from_user)
                    # 判断AI回复是否是结束字符串
                    is_end_str = self.is_end_str(ai_answer)
                    # 发送消息,最多重试3次
                    for attempt in range(3):
                        if at_wc_ids:
@@ -339,6 +495,24 @@
                            if ecloud_client.send_group_at_message(
                                w_id, from_group, processed_answer, at_wc_ids
                            ):
                                # @后触发静默模式
                                if silence_service.is_silence_active(from_group):
                                    # 如果该群组静默模式已激活,延长时间
                                    silence_service.extend_silence_mode(from_group)
                                    logger.info(
                                        f"AI回复@客服,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                                    )
                                else:
                                    # 如果该群组静默模式未激活,激活静默模式
                                    flag = silence_service.activate_silence_mode(from_group)
                                    if flag:
                                        logger.info(
                                        f"AI回复@客服,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                                        )
                                    else:
                                        logger.info(
                                        f"AI回复@客服,未激活静默模式: fromUser={from_user}, fromGroup={from_group}"
                                        )
                                success = True
                                break
                        else:
@@ -350,10 +524,27 @@
                                success = True
                                break
                        logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}")
                        logger.warning(
                            f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}"
                        )
                        if attempt < 2:  # 不是最后一次尝试,等待一段时间再重试
                            time.sleep(2 ** attempt)  # 指数退避
                            time.sleep(2**attempt)  # 指数退避
                    # AI回复结束字符串触发静默模式
                    if is_end_str:
                        if silence_service.is_silence_active(from_group):
                            # 如果该群组静默模式已激活,延长时间
                            silence_service.extend_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                            )
                        else:
                            # 如果该群组静默模式未激活,激活静默模式
                            silence_service.activate_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                            )
                if success:
                    # 更新发送状态
                    conversation = (
@@ -362,7 +553,7 @@
                            Conversation.from_user == from_user,
                            Conversation.conversation_id == new_conversation_id,
                            Conversation.group == from_group,
                            Conversation.hour == hour_key
                            Conversation.hour == hour_key,
                        )
                        .first()
                    )