| | |
| | | import json |
| | | import time |
| | | import re |
| | | import xml.etree.ElementTree as ET |
| | | from typing import Dict, Any, Optional, List, Tuple |
| | | from datetime import datetime |
| | | from sqlalchemy.orm import Session |
| | |
| | | from app.services.redis_queue import redis_queue |
| | | from app.services.ecloud_client import ecloud_client |
| | | from app.services.dify_client import dify_client |
| | | from app.services.friend_ignore_service import friend_ignore_service |
| | | from app.services.silence_service import silence_service |
| | | from app.services.group_stats_service import group_stats_service |
| | | from config import settings |
| | | |
| | | |
| | |
| | | |
| | | def __init__(self): |
| | | pass |
| | | |
| | | def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: |
| | | """ |
| | | 提取引用消息的内容(message_type为80014) |
| | | |
| | | Args: |
| | | callback_data: 回调数据 |
| | | |
| | | Returns: |
| | | 组合后的消息内容(content + title,中间空一行) |
| | | """ |
| | | try: |
| | | data = callback_data.get("data", {}) |
| | | content = data.get("content", "") |
| | | title = data.get("title", "") |
| | | |
| | | # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容 |
| | | refer_content = "" |
| | | xml_title = "" |
| | | try: |
| | | # 解析XML |
| | | root = ET.fromstring(content) |
| | | |
| | | # 查找msg>appmsg>refermsg>content路径 |
| | | appmsg = root.find("appmsg") |
| | | if appmsg is not None: |
| | | # 提取XML中的title(如果存在) |
| | | title_element = appmsg.find("title") |
| | | if title_element is not None and title_element.text: |
| | | xml_title = title_element.text.strip() |
| | | |
| | | # 提取引用消息内容 |
| | | refermsg = appmsg.find("refermsg") |
| | | if refermsg is not None: |
| | | content_element = refermsg.find("content") |
| | | if content_element is not None and content_element.text: |
| | | refer_content = content_element.text.strip() |
| | | |
| | | except ET.ParseError as e: |
| | | logger.warning(f"XML解析失败: {str(e)}, content={content}") |
| | | # 如果XML解析失败,使用原始content |
| | | refer_content = content |
| | | |
| | | # 确定最终使用的title:优先使用XML中的title,其次使用data.title |
| | | final_title = xml_title if xml_title else title |
| | | |
| | | # 组合内容:refer_content在前,final_title在后,中间空一行 |
| | | if refer_content and final_title: |
| | | combined_content = f"{refer_content}\n\n{final_title}" |
| | | elif refer_content: |
| | | combined_content = refer_content |
| | | elif final_title: |
| | | combined_content = final_title |
| | | else: |
| | | combined_content = content # 如果都没有,使用原始content |
| | | |
| | | logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") |
| | | return combined_content |
| | | |
| | | except Exception as e: |
| | | logger.error(f"提取引用消息内容异常: error={str(e)}") |
| | | # 异常情况下返回原始content |
| | | return callback_data.get("data", {}).get("content", "") |
| | | |
| | | def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: |
| | | """ |
| | |
| | | Returns: |
| | | 是否是有效的群聊消息 |
| | | """ |
| | | # 检查消息类型是否是群聊消息(80001) |
| | | # 检查消息类型是否是群聊消息(80001)或引用消息(80014) |
| | | message_type = callback_data.get("messageType") |
| | | if message_type != "80001": |
| | | print(f"data: {callback_data}") |
| | | if message_type not in ["80001", "80014"]: |
| | | logger.info(f"忽略非群聊消息: messageType={message_type}") |
| | | return False |
| | | |
| | |
| | | logger.warning(f"消息缺少必要字段: data={data}") |
| | | return False |
| | | |
| | | # 获取用户和群组信息 |
| | | from_user = data.get("fromUser") |
| | | from_group = data.get("fromGroup") |
| | | |
| | | # 检查发送者是否在好友忽略列表中 |
| | | is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user) |
| | | |
| | | if is_friend_ignored: |
| | | logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}") |
| | | # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计) |
| | | group_stats_service.increment_user_message_count(from_group, from_user) |
| | | # 激活或延长该群组的静默模式 |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}") |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}") |
| | | return False |
| | | |
| | | # # 统计正常处理的好友发言次数 |
| | | # group_stats_service.increment_user_message_count(from_group, from_user) |
| | | |
| | | # 检查该群组的静默模式是否激活(在好友忽略检查之后) |
| | | if silence_service.is_silence_active(from_group): |
| | | logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}") |
| | | return False |
| | | |
| | | return True |
| | | |
| | | def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: |
| | |
| | | if not self.is_valid_group_message(callback_data): |
| | | return False |
| | | |
| | | data = callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | # 导入消息聚合服务(延迟导入避免循环依赖) |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | # 将整个回调内容转成JSON字符串存储到Redis队列 |
| | | return redis_queue.enqueue_message(from_user, callback_data) |
| | | # 尝试将消息添加到聚合中 |
| | | should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) |
| | | |
| | | if should_process_immediately: |
| | | # 需要立即处理(不聚合或聚合超时) |
| | | data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | |
| | | # 将消息加入Redis队列 |
| | | return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) |
| | | else: |
| | | # 消息已被聚合,不需要立即处理 |
| | | logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}") |
| | | return True |
| | | |
| | | def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: |
| | | """ |
| | |
| | | from_group = data.get("fromGroup") |
| | | content = data.get("content") |
| | | w_id = data.get("wId") |
| | | message_type = message_data.get("messageType") |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}") |
| | | |
| | | # 根据消息类型处理内容 |
| | | if message_type == "80014": |
| | | # 引用消息,需要提取XML中的内容并与title组合 |
| | | content = self.extract_refer_message_content(message_data) |
| | | logger.info(f"引用消息内容处理完成: content_length={len(content)}") |
| | | |
| | | # 使用上下文管理器确保数据库会话正确管理 |
| | | with next(get_db()) as db: |
| | |
| | | logger.error(f"联系人信息处理失败: from_group={from_group}") |
| | | return False |
| | | |
| | | # 3.2 获取用户在当前群组的conversation_id |
| | | # 3.2 获取群组中发言次数最多的用户昵称 |
| | | most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group) |
| | | logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}") |
| | | |
| | | # 3.3 获取用户在当前群组的conversation_id |
| | | conversation_id = redis_queue.get_conversation_id(from_user, from_group) |
| | | |
| | | # 调用Dify接口发送消息 |
| | | dify_response = dify_client.send_chat_message( |
| | | query=content, user=from_user, conversation_id=conversation_id |
| | | # 调用Dify接口发送消息(根据配置选择模式) |
| | | dify_response = dify_client.send_message( |
| | | query=content, |
| | | user=from_user, |
| | | conversation_id=conversation_id, |
| | | nick_name=most_active_nickname |
| | | ) |
| | | |
| | | if not dify_response: |
| | |
| | | |
| | | # 更新Redis中的conversation_id(基于用户+群组) |
| | | if new_conversation_id: |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800) |
| | | |
| | | # 3.3 保存对话记录到数据库 |
| | | # 3.4 保存对话记录到数据库 |
| | | # 按用户、群组和小时分组对话记录 |
| | | current_time = datetime.now() |
| | | hour_key = current_time.strftime("%Y%m%d_%H") |