yj
2025-07-28 69945b730fd3f6b6138ce50e49fc3392fcd74d71
app/services/message_processor.py
@@ -3,7 +3,10 @@
"""
import json
from typing import Dict, Any, Optional
import time
import re
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from loguru import logger
@@ -13,6 +16,10 @@
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
from app.services.friend_ignore_service import friend_ignore_service
from app.services.silence_service import silence_service
from app.services.group_stats_service import group_stats_service
from config import settings
class MessageProcessor:
@@ -20,6 +27,51 @@
    def __init__(self):
        pass
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
        """
        解析AI回复中的@字符,提取客服名称
        Args:
            ai_answer: AI回复内容
        Returns:
            (处理后的消息内容, 需要@的客服wcid列表)
        """
        try:
            # 获取配置的客服名称列表
            customer_service_names = settings.customer_service_names
            # 查找所有@字符后的客服名称
            at_pattern = r'@([^\s]+)'
            matches = re.findall(at_pattern, ai_answer)
            valid_at_names = []
            at_wc_ids = []
            for match in matches:
                # 检查是否在配置的客服名称列表中
                if match in customer_service_names:
                    valid_at_names.append(match)
                    logger.info(f"发现有效的@客服名称: {match}")
            # 如果有有效的@客服名称,查询数据库获取wcid
            if valid_at_names:
                with next(get_db()) as db:
                    for name in valid_at_names:
                        # 根据nick_name查找联系人
                        contact = db.query(Contact).filter(Contact.nick_name == name).first()
                        if contact:
                            at_wc_ids.append(contact.wc_id)
                            logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}")
                        else:
                            logger.warning(f"未找到客服联系人: name={name}")
            return ai_answer, at_wc_ids
        except Exception as e:
            logger.error(f"解析@字符异常: error={str(e)}")
            return ai_answer, []
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
@@ -50,6 +102,36 @@
            or not data.get("content")
        ):
            logger.warning(f"消息缺少必要字段: data={data}")
            return False
        # 获取用户和群组信息
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
        # 检查发送者是否在好友忽略列表中
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user)
        if is_friend_ignored:
            logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}")
            # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
            group_stats_service.increment_user_message_count(from_group, from_user)
            # 激活或延长该群组的静默模式
            if silence_service.is_silence_active(from_group):
                # 如果该群组静默模式已激活,延长时间
                silence_service.extend_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}")
            else:
                # 如果该群组静默模式未激活,激活静默模式
                silence_service.activate_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}")
            return False
        # # 统计正常处理的好友发言次数
        # group_stats_service.increment_user_message_count(from_group, from_user)
        # 检查该群组的静默模式是否激活(在好友忽略检查之后)
        if silence_service.is_silence_active(from_group):
            logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}")
            return False
        return True
@@ -151,21 +233,26 @@
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            # 获取数据库会话
            db = next(get_db())
            try:
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db:
                # 3.1 确保联系人信息存在
                if not self.ensure_contact_exists(from_group, w_id, db):
                    logger.error(f"联系人信息处理失败: from_group={from_group}")
                    return False
                # 3.2 获取用户的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user)
                # 3.2 获取群组中发言次数最多的用户昵称
                most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group)
                logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}")
                # 调用Dify接口发送消息
                dify_response = dify_client.send_chat_message(
                    query=content, user=from_user, conversation_id=conversation_id
                # 3.3 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
                # 调用Dify接口发送消息(根据配置选择模式)
                dify_response = dify_client.send_message(
                    query=content,
                    user=from_user,
                    conversation_id=conversation_id,
                    nick_name=most_active_nickname
                )
                if not dify_response:
@@ -176,55 +263,112 @@
                ai_answer = dify_response.get("answer", "")
                new_conversation_id = dify_response.get("conversation_id", "")
                # 更新Redis中的conversation_id
                # 更新Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id)
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800)
                # 3.3 保存对话记录到数据库
                user_conversation_key = f"{from_user}_{new_conversation_id}"
                # 3.4 保存对话记录到数据库
                # 按用户、群组和小时分组对话记录
                current_time = datetime.now()
                hour_key = current_time.strftime("%Y%m%d_%H")
                # 检查是否已存在记录
                # 查找当前用户在当前群组当前小时的对话记录
                existing_conversation = (
                    db.query(Conversation)
                    .filter(Conversation.user_conversation_key == user_conversation_key)
                    .filter(
                        Conversation.from_user == from_user,
                        Conversation.conversation_id == new_conversation_id,
                        Conversation.group == from_group,
                        Conversation.hour == hour_key
                    )
                    .first()
                )
                if existing_conversation:
                    # 更新现有记录
                    existing_conversation.user_question = content
                    existing_conversation.ai_answer = ai_answer
                    existing_conversation.is_processed = True
                    logger.info(f"更新对话记录: key={user_conversation_key}")
                    # 更新现有记录 - 使用JSON格式追加对话内容(当前用户在当前群组当前小时的对话)
                    try:
                        # 解析现有的content JSON
                        if existing_conversation.content:
                            content_list = json.loads(existing_conversation.content)
                        else:
                            content_list = []
                        # 追加新的对话内容
                        content_list.append({
                            "user": content,
                            "ai": ai_answer
                        })
                        # 更新记录
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.is_processed = True
                        logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}")
                    except json.JSONDecodeError as e:
                        logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建")
                        # 如果JSON解析失败,重新创建content
                        content_list = [{"user": content, "ai": ai_answer}]
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.is_processed = True
                else:
                    # 创建新记录
                    # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容
                    content_list = [{"user": content, "ai": ai_answer}]
                    new_conversation = Conversation(
                        user_conversation_key=user_conversation_key,
                        from_user=from_user,
                        conversation_id=new_conversation_id,
                        user_question=content,
                        ai_answer=ai_answer,
                        group=from_group,
                        hour=hour_key,
                        content=json.dumps(content_list, ensure_ascii=False),
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(f"创建对话记录: key={user_conversation_key}")
                    logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1")
                db.commit()
                # 发送AI回答到群聊
                if ai_answer and ecloud_client.send_group_message(
                    w_id, from_group, ai_answer
                ):
                success = False
                if ai_answer:
                    # 解析AI回复中的@字符
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
                    # 发送消息,最多重试3次
                    for attempt in range(3):
                        if at_wc_ids:
                            # 如果有@客服,使用群聊@接口
                            logger.info(f"使用群聊@接口发送消息: at_wc_ids={at_wc_ids}")
                            if ecloud_client.send_group_at_message(
                                w_id, from_group, processed_answer, at_wc_ids
                            ):
                                success = True
                                break
                        else:
                            # 普通群聊消息
                            logger.info("使用普通群聊接口发送消息")
                            if ecloud_client.send_group_message(
                                w_id, from_group, processed_answer
                            ):
                                success = True
                                break
                        logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}")
                        if attempt < 2:  # 不是最后一次尝试,等待一段时间再重试
                            time.sleep(2 ** attempt)  # 指数退避
                if success:
                    # 更新发送状态
                    conversation = (
                        db.query(Conversation)
                        .filter(
                            Conversation.user_conversation_key == user_conversation_key
                            Conversation.from_user == from_user,
                            Conversation.conversation_id == new_conversation_id,
                            Conversation.group == from_group,
                            Conversation.hour == hour_key
                        )
                        .first()
                    )
                    if conversation:
                        conversation.is_sent = True
                        conversation.sent_time = current_time
                        db.commit()
                    logger.info(f"消息处理完成: from_user={from_user}")
@@ -232,9 +376,6 @@
                else:
                    logger.error(f"发送AI回答失败: from_user={from_user}")
                    return False
            finally:
                db.close()
        except Exception as e:
            logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")