| | |
| | | import json |
| | | import time |
| | | import re |
| | | import xml.etree.ElementTree as ET |
| | | from typing import Dict, Any, Optional, List, Tuple |
| | | from datetime import datetime |
| | | from sqlalchemy.orm import Session |
| | |
| | | |
| | | def __init__(self): |
| | | pass |
| | | |
| | | def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: |
| | | """ |
| | | 提取引用消息的内容(message_type为80014) |
| | | |
| | | Args: |
| | | callback_data: 回调数据 |
| | | |
| | | Returns: |
| | | 组合后的消息内容(content + title,中间空一行) |
| | | """ |
| | | try: |
| | | data = callback_data.get("data", {}) |
| | | content = data.get("content", "") |
| | | title = data.get("title", "") |
| | | |
| | | # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容 |
| | | refer_content = "" |
| | | xml_title = "" |
| | | try: |
| | | # 解析XML |
| | | root = ET.fromstring(content) |
| | | |
| | | # 查找msg>appmsg>refermsg>content路径 |
| | | appmsg = root.find("appmsg") |
| | | if appmsg is not None: |
| | | # 提取XML中的title(如果存在) |
| | | title_element = appmsg.find("title") |
| | | if title_element is not None and title_element.text: |
| | | xml_title = title_element.text.strip() |
| | | |
| | | # 提取引用消息内容 |
| | | refermsg = appmsg.find("refermsg") |
| | | if refermsg is not None: |
| | | content_element = refermsg.find("content") |
| | | if content_element is not None and content_element.text: |
| | | refer_content = content_element.text.strip() |
| | | |
| | | except ET.ParseError as e: |
| | | logger.warning(f"XML解析失败: {str(e)}, content={content}") |
| | | # 如果XML解析失败,使用原始content |
| | | refer_content = content |
| | | |
| | | # 确定最终使用的title:优先使用XML中的title,其次使用data.title |
| | | final_title = xml_title if xml_title else title |
| | | |
| | | # 组合内容:refer_content在前,final_title在后,中间空一行 |
| | | if refer_content and final_title: |
| | | combined_content = f"{refer_content}\n\n{final_title}" |
| | | elif refer_content: |
| | | combined_content = refer_content |
| | | elif final_title: |
| | | combined_content = final_title |
| | | else: |
| | | combined_content = content # 如果都没有,使用原始content |
| | | |
| | | logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") |
| | | return combined_content |
| | | |
| | | except Exception as e: |
| | | logger.error(f"提取引用消息内容异常: error={str(e)}") |
| | | # 异常情况下返回原始content |
| | | return callback_data.get("data", {}).get("content", "") |
| | | |
| | | def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: |
| | | """ |
| | |
| | | Returns: |
| | | 是否是有效的群聊消息 |
| | | """ |
| | | # 检查消息类型是否是群聊消息(80001) |
| | | # 检查消息类型是否是群聊消息(80001)或引用消息(80014) |
| | | message_type = callback_data.get("messageType") |
| | | if message_type != "80001": |
| | | print(f"data: {callback_data}") |
| | | if message_type not in ["80001", "80014"]: |
| | | logger.info(f"忽略非群聊消息: messageType={message_type}") |
| | | return False |
| | | |
| | |
| | | if not self.is_valid_group_message(callback_data): |
| | | return False |
| | | |
| | | data = callback_data.get("data", {}) |
| | | # 导入消息聚合服务(延迟导入避免循环依赖) |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | # 尝试将消息添加到聚合中 |
| | | should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) |
| | | |
| | | if should_process_immediately: |
| | | # 需要立即处理(不聚合或聚合超时) |
| | | data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | |
| | | # 将整个回调内容转成JSON字符串存储到Redis队列 |
| | | return redis_queue.enqueue_message(from_user, callback_data) |
| | | # 将消息加入Redis队列 |
| | | return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) |
| | | else: |
| | | # 消息已被聚合,不需要立即处理 |
| | | logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}") |
| | | return True |
| | | |
| | | def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: |
| | | """ |
| | |
| | | from_group = data.get("fromGroup") |
| | | content = data.get("content") |
| | | w_id = data.get("wId") |
| | | message_type = message_data.get("messageType") |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}") |
| | | |
| | | # 根据消息类型处理内容 |
| | | if message_type == "80014": |
| | | # 引用消息,需要提取XML中的内容并与title组合 |
| | | content = self.extract_refer_message_content(message_data) |
| | | logger.info(f"引用消息内容处理完成: content_length={len(content)}") |
| | | |
| | | # 使用上下文管理器确保数据库会话正确管理 |
| | | with next(get_db()) as db: |