"""
|
消息处理核心逻辑
|
"""
|
|
import json
|
import time
|
import re
|
import xml.etree.ElementTree as ET
|
from typing import Dict, Any, Optional, List, Tuple
|
from datetime import datetime
|
from sqlalchemy.orm import Session
|
from loguru import logger
|
|
from app.models.database import get_db
|
from app.models.contact import Contact
|
from app.models.conversation import Conversation
|
from app.services.redis_queue import redis_queue
|
from app.services.ecloud_client import ecloud_client
|
from app.services.dify_client import dify_client
|
from app.services.friend_ignore_service import friend_ignore_service
|
from app.services.silence_service import silence_service
|
from app.services.group_stats_service import group_stats_service
|
from config import settings
|
|
|
class MessageProcessor:
|
"""消息处理器"""
|
|
def __init__(self):
|
pass
|
|
def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
|
"""
|
提取引用消息的内容(message_type为80014)
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
组合后的消息内容(content + title,中间空一行)
|
"""
|
try:
|
data = callback_data.get("data", {})
|
content = data.get("content", "")
|
title = data.get("title", "")
|
|
# 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
|
refer_content = ""
|
xml_title = ""
|
try:
|
# 解析XML
|
root = ET.fromstring(content)
|
|
# 查找msg>appmsg>refermsg>content路径
|
appmsg = root.find("appmsg")
|
if appmsg is not None:
|
# 提取XML中的title(如果存在)
|
title_element = appmsg.find("title")
|
if title_element is not None and title_element.text:
|
xml_title = title_element.text.strip()
|
|
# 提取引用消息内容
|
refermsg = appmsg.find("refermsg")
|
if refermsg is not None:
|
content_element = refermsg.find("content")
|
if content_element is not None and content_element.text:
|
refer_content = content_element.text.strip()
|
|
except ET.ParseError as e:
|
logger.warning(f"XML解析失败: {str(e)}, content={content}")
|
# 如果XML解析失败,使用原始content
|
refer_content = content
|
|
# 确定最终使用的title:优先使用XML中的title,其次使用data.title
|
final_title = xml_title if xml_title else title
|
|
# 组合内容:refer_content在前,final_title在后,中间空一行
|
if refer_content and final_title:
|
combined_content = f"{refer_content}\n\n{final_title}"
|
elif refer_content:
|
combined_content = refer_content
|
elif final_title:
|
combined_content = final_title
|
else:
|
combined_content = content # 如果都没有,使用原始content
|
|
logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
|
return combined_content
|
|
except Exception as e:
|
logger.error(f"提取引用消息内容异常: error={str(e)}")
|
# 异常情况下返回原始content
|
return callback_data.get("data", {}).get("content", "")
|
|
def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
|
"""
|
解析AI回复中的@字符,提取客服名称
|
|
Args:
|
ai_answer: AI回复内容
|
|
Returns:
|
(处理后的消息内容, 需要@的客服wcid列表)
|
"""
|
try:
|
# 获取配置的客服名称列表
|
customer_service_names = settings.customer_service_names
|
|
# 查找所有@字符后的客服名称
|
at_pattern = r'@([^\s]+)'
|
matches = re.findall(at_pattern, ai_answer)
|
|
valid_at_names = []
|
at_wc_ids = []
|
|
for match in matches:
|
# 检查是否在配置的客服名称列表中
|
if match in customer_service_names:
|
valid_at_names.append(match)
|
logger.info(f"发现有效的@客服名称: {match}")
|
|
# 如果有有效的@客服名称,查询数据库获取wcid
|
if valid_at_names:
|
with next(get_db()) as db:
|
for name in valid_at_names:
|
# 根据nick_name查找联系人
|
contact = db.query(Contact).filter(Contact.nick_name == name).first()
|
if contact:
|
at_wc_ids.append(contact.wc_id)
|
logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}")
|
else:
|
logger.warning(f"未找到客服联系人: name={name}")
|
|
return ai_answer, at_wc_ids
|
|
except Exception as e:
|
logger.error(f"解析@字符异常: error={str(e)}")
|
return ai_answer, []
|
|
def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
|
"""
|
检查是否是有效的群聊消息
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
是否是有效的群聊消息
|
"""
|
# 检查消息类型是否是群聊消息(80001)或引用消息(80014)
|
message_type = callback_data.get("messageType")
|
print(f"data: {callback_data}")
|
if message_type not in ["80001", "80014"]:
|
logger.info(f"忽略非群聊消息: messageType={message_type}")
|
return False
|
|
# 检查是否是自己发送的消息
|
data = callback_data.get("data", {})
|
if data.get("self", False):
|
logger.info(f"忽略自己发送的消息: fromUser={data.get('fromUser')}")
|
return False
|
|
# 检查必要字段
|
if (
|
not data.get("fromUser")
|
or not data.get("fromGroup")
|
or not data.get("content")
|
):
|
logger.warning(f"消息缺少必要字段: data={data}")
|
return False
|
|
# 获取用户和群组信息
|
from_user = data.get("fromUser")
|
from_group = data.get("fromGroup")
|
|
# 检查发送者是否在好友忽略列表中
|
is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user)
|
|
if is_friend_ignored:
|
logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}")
|
# 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
|
group_stats_service.increment_user_message_count(from_group, from_user)
|
# 激活或延长该群组的静默模式
|
if silence_service.is_silence_active(from_group):
|
# 如果该群组静默模式已激活,延长时间
|
silence_service.extend_silence_mode(from_group)
|
logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}")
|
else:
|
# 如果该群组静默模式未激活,激活静默模式
|
silence_service.activate_silence_mode(from_group)
|
logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}")
|
return False
|
|
# # 统计正常处理的好友发言次数
|
# group_stats_service.increment_user_message_count(from_group, from_user)
|
|
# 检查该群组的静默模式是否激活(在好友忽略检查之后)
|
if silence_service.is_silence_active(from_group):
|
logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}")
|
return False
|
|
return True
|
|
def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool:
|
"""
|
将回调消息加入队列
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
是否成功加入队列
|
"""
|
# 验证消息有效性
|
if not self.is_valid_group_message(callback_data):
|
return False
|
|
# 导入消息聚合服务(延迟导入避免循环依赖)
|
from app.services.message_aggregator import message_aggregator
|
|
# 尝试将消息添加到聚合中
|
should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
|
|
if should_process_immediately:
|
# 需要立即处理(不聚合或聚合超时)
|
data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
|
from_user = data.get("fromUser")
|
|
# 将消息加入Redis队列
|
return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
|
else:
|
# 消息已被聚合,不需要立即处理
|
logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}")
|
return True
|
|
def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
|
"""
|
确保联系人信息存在于数据库中
|
|
Args:
|
from_group: 群组ID
|
w_id: 登录实例标识
|
db: 数据库会话
|
|
Returns:
|
是否成功确保联系人存在
|
"""
|
try:
|
# 检查数据库中是否已存在该联系人
|
existing_contact = (
|
db.query(Contact).filter(Contact.wc_id == from_group).first()
|
)
|
|
if existing_contact:
|
logger.info(f"联系人已存在: wc_id={from_group}")
|
return True
|
|
# 调用E云管家API获取联系人信息
|
contact_info = ecloud_client.get_contact_info(w_id, from_group)
|
|
if not contact_info:
|
logger.error(f"无法获取联系人信息: wc_id={from_group}")
|
return False
|
|
# 保存联系人信息到数据库
|
new_contact = Contact(
|
wc_id=from_group,
|
user_name=contact_info.get("userName"),
|
nick_name=contact_info.get("nickName"),
|
remark=contact_info.get("remark"),
|
signature=contact_info.get("signature"),
|
sex=contact_info.get("sex"),
|
alias_name=contact_info.get("aliasName"),
|
country=contact_info.get("country"),
|
big_head=contact_info.get("bigHead"),
|
small_head=contact_info.get("smallHead"),
|
label_list=contact_info.get("labelList"),
|
v1=contact_info.get("v1"),
|
)
|
|
db.add(new_contact)
|
db.commit()
|
|
logger.info(
|
f"成功保存联系人信息: wc_id={from_group}, nick_name={contact_info.get('nickName')}"
|
)
|
return True
|
|
except Exception as e:
|
logger.error(f"确保联系人存在失败: wc_id={from_group}, error={str(e)}")
|
db.rollback()
|
return False
|
|
def process_single_message(self, message_data: Dict[str, Any]) -> bool:
|
"""
|
处理单条消息
|
|
Args:
|
message_data: 消息数据
|
|
Returns:
|
是否处理成功
|
"""
|
try:
|
data = message_data.get("data", {})
|
from_user = data.get("fromUser")
|
from_group = data.get("fromGroup")
|
content = data.get("content")
|
w_id = data.get("wId")
|
message_type = message_data.get("messageType")
|
|
logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}")
|
|
# 根据消息类型处理内容
|
if message_type == "80014":
|
# 引用消息,需要提取XML中的内容并与title组合
|
content = self.extract_refer_message_content(message_data)
|
logger.info(f"引用消息内容处理完成: content_length={len(content)}")
|
|
# 使用上下文管理器确保数据库会话正确管理
|
with next(get_db()) as db:
|
# 3.1 确保联系人信息存在
|
if not self.ensure_contact_exists(from_group, w_id, db):
|
logger.error(f"联系人信息处理失败: from_group={from_group}")
|
return False
|
|
# 3.2 获取群组中发言次数最多的用户昵称
|
most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group)
|
logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}")
|
|
# 3.3 获取用户在当前群组的conversation_id
|
conversation_id = redis_queue.get_conversation_id(from_user, from_group)
|
|
# 调用Dify接口发送消息(根据配置选择模式)
|
dify_response = dify_client.send_message(
|
query=content,
|
user=from_user,
|
conversation_id=conversation_id,
|
nick_name=most_active_nickname
|
)
|
|
if not dify_response:
|
logger.error(f"Dify响应失败: from_user={from_user}")
|
return False
|
|
# 获取AI回答和新的conversation_id
|
ai_answer = dify_response.get("answer", "")
|
new_conversation_id = dify_response.get("conversation_id", "")
|
|
# 更新Redis中的conversation_id(基于用户+群组)
|
if new_conversation_id:
|
redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800)
|
|
# 3.4 保存对话记录到数据库
|
# 按用户、群组和小时分组对话记录
|
current_time = datetime.now()
|
hour_key = current_time.strftime("%Y%m%d_%H")
|
|
# 查找当前用户在当前群组当前小时的对话记录
|
existing_conversation = (
|
db.query(Conversation)
|
.filter(
|
Conversation.from_user == from_user,
|
Conversation.conversation_id == new_conversation_id,
|
Conversation.group == from_group,
|
Conversation.hour == hour_key
|
)
|
.first()
|
)
|
|
if existing_conversation:
|
# 更新现有记录 - 使用JSON格式追加对话内容(当前用户在当前群组当前小时的对话)
|
try:
|
# 解析现有的content JSON
|
if existing_conversation.content:
|
content_list = json.loads(existing_conversation.content)
|
else:
|
content_list = []
|
|
# 追加新的对话内容
|
content_list.append({
|
"user": content,
|
"ai": ai_answer
|
})
|
|
# 更新记录
|
existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
|
existing_conversation.is_processed = True
|
logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}")
|
except json.JSONDecodeError as e:
|
logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建")
|
# 如果JSON解析失败,重新创建content
|
content_list = [{"user": content, "ai": ai_answer}]
|
existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
|
existing_conversation.is_processed = True
|
else:
|
# 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容
|
content_list = [{"user": content, "ai": ai_answer}]
|
new_conversation = Conversation(
|
from_user=from_user,
|
conversation_id=new_conversation_id,
|
group=from_group,
|
hour=hour_key,
|
content=json.dumps(content_list, ensure_ascii=False),
|
is_processed=True,
|
)
|
db.add(new_conversation)
|
logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1")
|
|
db.commit()
|
|
# 发送AI回答到群聊
|
success = False
|
if ai_answer:
|
# 解析AI回复中的@字符
|
processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
|
|
# 发送消息,最多重试3次
|
for attempt in range(3):
|
if at_wc_ids:
|
# 如果有@客服,使用群聊@接口
|
logger.info(f"使用群聊@接口发送消息: at_wc_ids={at_wc_ids}")
|
if ecloud_client.send_group_at_message(
|
w_id, from_group, processed_answer, at_wc_ids
|
):
|
success = True
|
break
|
else:
|
# 普通群聊消息
|
logger.info("使用普通群聊接口发送消息")
|
if ecloud_client.send_group_message(
|
w_id, from_group, processed_answer
|
):
|
success = True
|
break
|
|
logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}")
|
if attempt < 2: # 不是最后一次尝试,等待一段时间再重试
|
time.sleep(2 ** attempt) # 指数退避
|
|
if success:
|
# 更新发送状态
|
conversation = (
|
db.query(Conversation)
|
.filter(
|
Conversation.from_user == from_user,
|
Conversation.conversation_id == new_conversation_id,
|
Conversation.group == from_group,
|
Conversation.hour == hour_key
|
)
|
.first()
|
)
|
if conversation:
|
conversation.is_sent = True
|
conversation.sent_time = current_time
|
db.commit()
|
|
logger.info(f"消息处理完成: from_user={from_user}")
|
return True
|
else:
|
logger.error(f"发送AI回答失败: from_user={from_user}")
|
return False
|
|
except Exception as e:
|
logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")
|
return False
|
|
|
# 全局消息处理器实例
|
message_processor = MessageProcessor()
|