"""
|
消息处理核心逻辑
|
"""
|
|
import json
|
from typing import Dict, Any, Optional
|
from sqlalchemy.orm import Session
|
from loguru import logger
|
|
from app.models.database import get_db
|
from app.models.contact import Contact
|
from app.models.conversation import Conversation
|
from app.services.redis_queue import redis_queue
|
from app.services.ecloud_client import ecloud_client
|
from app.services.dify_client import dify_client
|
|
|
class MessageProcessor:
|
"""消息处理器"""
|
|
def __init__(self):
|
pass
|
|
def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
|
"""
|
检查是否是有效的群聊消息
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
是否是有效的群聊消息
|
"""
|
# 检查消息类型是否是群聊消息(80001)
|
message_type = callback_data.get("messageType")
|
if message_type != "80001":
|
logger.info(f"忽略非群聊消息: messageType={message_type}")
|
return False
|
|
# 检查是否是自己发送的消息
|
data = callback_data.get("data", {})
|
if data.get("self", False):
|
logger.info(f"忽略自己发送的消息: fromUser={data.get('fromUser')}")
|
return False
|
|
# 检查必要字段
|
if (
|
not data.get("fromUser")
|
or not data.get("fromGroup")
|
or not data.get("content")
|
):
|
logger.warning(f"消息缺少必要字段: data={data}")
|
return False
|
|
return True
|
|
def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool:
|
"""
|
将回调消息加入队列
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
是否成功加入队列
|
"""
|
# 验证消息有效性
|
if not self.is_valid_group_message(callback_data):
|
return False
|
|
data = callback_data.get("data", {})
|
from_user = data.get("fromUser")
|
|
# 将整个回调内容转成JSON字符串存储到Redis队列
|
return redis_queue.enqueue_message(from_user, callback_data)
|
|
def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
|
"""
|
确保联系人信息存在于数据库中
|
|
Args:
|
from_group: 群组ID
|
w_id: 登录实例标识
|
db: 数据库会话
|
|
Returns:
|
是否成功确保联系人存在
|
"""
|
try:
|
# 检查数据库中是否已存在该联系人
|
existing_contact = (
|
db.query(Contact).filter(Contact.wc_id == from_group).first()
|
)
|
|
if existing_contact:
|
logger.info(f"联系人已存在: wc_id={from_group}")
|
return True
|
|
# 调用E云管家API获取联系人信息
|
contact_info = ecloud_client.get_contact_info(w_id, from_group)
|
|
if not contact_info:
|
logger.error(f"无法获取联系人信息: wc_id={from_group}")
|
return False
|
|
# 保存联系人信息到数据库
|
new_contact = Contact(
|
wc_id=from_group,
|
user_name=contact_info.get("userName"),
|
nick_name=contact_info.get("nickName"),
|
remark=contact_info.get("remark"),
|
signature=contact_info.get("signature"),
|
sex=contact_info.get("sex"),
|
alias_name=contact_info.get("aliasName"),
|
country=contact_info.get("country"),
|
big_head=contact_info.get("bigHead"),
|
small_head=contact_info.get("smallHead"),
|
label_list=contact_info.get("labelList"),
|
v1=contact_info.get("v1"),
|
)
|
|
db.add(new_contact)
|
db.commit()
|
|
logger.info(
|
f"成功保存联系人信息: wc_id={from_group}, nick_name={contact_info.get('nickName')}"
|
)
|
return True
|
|
except Exception as e:
|
logger.error(f"确保联系人存在失败: wc_id={from_group}, error={str(e)}")
|
db.rollback()
|
return False
|
|
def process_single_message(self, message_data: Dict[str, Any]) -> bool:
|
"""
|
处理单条消息
|
|
Args:
|
message_data: 消息数据
|
|
Returns:
|
是否处理成功
|
"""
|
try:
|
data = message_data.get("data", {})
|
from_user = data.get("fromUser")
|
from_group = data.get("fromGroup")
|
content = data.get("content")
|
w_id = data.get("wId")
|
|
logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
|
|
# 获取数据库会话
|
db = next(get_db())
|
|
try:
|
# 3.1 确保联系人信息存在
|
if not self.ensure_contact_exists(from_group, w_id, db):
|
logger.error(f"联系人信息处理失败: from_group={from_group}")
|
return False
|
|
# 3.2 获取用户的conversation_id
|
conversation_id = redis_queue.get_conversation_id(from_user)
|
|
# 调用Dify接口发送消息
|
dify_response = dify_client.send_chat_message(
|
query=content, user=from_user, conversation_id=conversation_id
|
)
|
|
if not dify_response:
|
logger.error(f"Dify响应失败: from_user={from_user}")
|
return False
|
|
# 获取AI回答和新的conversation_id
|
ai_answer = dify_response.get("answer", "")
|
new_conversation_id = dify_response.get("conversation_id", "")
|
|
# 更新Redis中的conversation_id
|
if new_conversation_id:
|
redis_queue.set_conversation_id(from_user, new_conversation_id)
|
|
# 3.3 保存对话记录到数据库
|
user_conversation_key = f"{from_user}_{new_conversation_id}"
|
|
# 检查是否已存在记录
|
existing_conversation = (
|
db.query(Conversation)
|
.filter(Conversation.user_conversation_key == user_conversation_key)
|
.first()
|
)
|
|
if existing_conversation:
|
# 更新现有记录
|
existing_conversation.user_question = content
|
existing_conversation.ai_answer = ai_answer
|
existing_conversation.is_processed = True
|
logger.info(f"更新对话记录: key={user_conversation_key}")
|
else:
|
# 创建新记录
|
new_conversation = Conversation(
|
user_conversation_key=user_conversation_key,
|
from_user=from_user,
|
conversation_id=new_conversation_id,
|
user_question=content,
|
ai_answer=ai_answer,
|
is_processed=True,
|
)
|
db.add(new_conversation)
|
logger.info(f"创建对话记录: key={user_conversation_key}")
|
|
db.commit()
|
|
# 发送AI回答到群聊
|
if ai_answer and ecloud_client.send_group_message(
|
w_id, from_group, ai_answer
|
):
|
# 更新发送状态
|
conversation = (
|
db.query(Conversation)
|
.filter(
|
Conversation.user_conversation_key == user_conversation_key
|
)
|
.first()
|
)
|
if conversation:
|
conversation.is_sent = True
|
db.commit()
|
|
logger.info(f"消息处理完成: from_user={from_user}")
|
return True
|
else:
|
logger.error(f"发送AI回答失败: from_user={from_user}")
|
return False
|
|
finally:
|
db.close()
|
|
except Exception as e:
|
logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")
|
return False
|
|
|
# 全局消息处理器实例
|
message_processor = MessageProcessor()
|