""" 消息处理核心逻辑 """ import json from typing import Dict, Any, Optional from sqlalchemy.orm import Session from loguru import logger from app.models.database import get_db from app.models.contact import Contact from app.models.conversation import Conversation from app.services.redis_queue import redis_queue from app.services.ecloud_client import ecloud_client from app.services.dify_client import dify_client class MessageProcessor: """消息处理器""" def __init__(self): pass def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: """ 检查是否是有效的群聊消息 Args: callback_data: 回调数据 Returns: 是否是有效的群聊消息 """ # 检查消息类型是否是群聊消息(80001) message_type = callback_data.get("messageType") if message_type != "80001": logger.info(f"忽略非群聊消息: messageType={message_type}") return False # 检查是否是自己发送的消息 data = callback_data.get("data", {}) if data.get("self", False): logger.info(f"忽略自己发送的消息: fromUser={data.get('fromUser')}") return False # 检查必要字段 if ( not data.get("fromUser") or not data.get("fromGroup") or not data.get("content") ): logger.warning(f"消息缺少必要字段: data={data}") return False return True def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: """ 将回调消息加入队列 Args: callback_data: 回调数据 Returns: 是否成功加入队列 """ # 验证消息有效性 if not self.is_valid_group_message(callback_data): return False data = callback_data.get("data", {}) from_user = data.get("fromUser") # 将整个回调内容转成JSON字符串存储到Redis队列 return redis_queue.enqueue_message(from_user, callback_data) def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: """ 确保联系人信息存在于数据库中 Args: from_group: 群组ID w_id: 登录实例标识 db: 数据库会话 Returns: 是否成功确保联系人存在 """ try: # 检查数据库中是否已存在该联系人 existing_contact = ( db.query(Contact).filter(Contact.wc_id == from_group).first() ) if existing_contact: logger.info(f"联系人已存在: wc_id={from_group}") return True # 调用E云管家API获取联系人信息 contact_info = ecloud_client.get_contact_info(w_id, from_group) if not contact_info: logger.error(f"无法获取联系人信息: wc_id={from_group}") return False # 保存联系人信息到数据库 new_contact = Contact( wc_id=from_group, user_name=contact_info.get("userName"), nick_name=contact_info.get("nickName"), remark=contact_info.get("remark"), signature=contact_info.get("signature"), sex=contact_info.get("sex"), alias_name=contact_info.get("aliasName"), country=contact_info.get("country"), big_head=contact_info.get("bigHead"), small_head=contact_info.get("smallHead"), label_list=contact_info.get("labelList"), v1=contact_info.get("v1"), ) db.add(new_contact) db.commit() logger.info( f"成功保存联系人信息: wc_id={from_group}, nick_name={contact_info.get('nickName')}" ) return True except Exception as e: logger.error(f"确保联系人存在失败: wc_id={from_group}, error={str(e)}") db.rollback() return False def process_single_message(self, message_data: Dict[str, Any]) -> bool: """ 处理单条消息 Args: message_data: 消息数据 Returns: 是否处理成功 """ try: data = message_data.get("data", {}) from_user = data.get("fromUser") from_group = data.get("fromGroup") content = data.get("content") w_id = data.get("wId") logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") # 获取数据库会话 db = next(get_db()) try: # 3.1 确保联系人信息存在 if not self.ensure_contact_exists(from_group, w_id, db): logger.error(f"联系人信息处理失败: from_group={from_group}") return False # 3.2 获取用户的conversation_id conversation_id = redis_queue.get_conversation_id(from_user) # 调用Dify接口发送消息 dify_response = dify_client.send_chat_message( query=content, user=from_user, conversation_id=conversation_id ) if not dify_response: logger.error(f"Dify响应失败: from_user={from_user}") return False # 获取AI回答和新的conversation_id ai_answer = dify_response.get("answer", "") new_conversation_id = dify_response.get("conversation_id", "") # 更新Redis中的conversation_id if new_conversation_id: redis_queue.set_conversation_id(from_user, new_conversation_id) # 3.3 保存对话记录到数据库 user_conversation_key = f"{from_user}_{new_conversation_id}" # 检查是否已存在记录 existing_conversation = ( db.query(Conversation) .filter(Conversation.user_conversation_key == user_conversation_key) .first() ) if existing_conversation: # 更新现有记录 existing_conversation.user_question = content existing_conversation.ai_answer = ai_answer existing_conversation.is_processed = True logger.info(f"更新对话记录: key={user_conversation_key}") else: # 创建新记录 new_conversation = Conversation( user_conversation_key=user_conversation_key, from_user=from_user, conversation_id=new_conversation_id, user_question=content, ai_answer=ai_answer, is_processed=True, ) db.add(new_conversation) logger.info(f"创建对话记录: key={user_conversation_key}") db.commit() # 发送AI回答到群聊 if ai_answer and ecloud_client.send_group_message( w_id, from_group, ai_answer ): # 更新发送状态 conversation = ( db.query(Conversation) .filter( Conversation.user_conversation_key == user_conversation_key ) .first() ) if conversation: conversation.is_sent = True db.commit() logger.info(f"消息处理完成: from_user={from_user}") return True else: logger.error(f"发送AI回答失败: from_user={from_user}") return False finally: db.close() except Exception as e: logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}") return False # 全局消息处理器实例 message_processor = MessageProcessor()