| | |
| | | import json |
| | | import time |
| | | import re |
| | | import xml.etree.ElementTree as ET |
| | | from typing import Dict, Any, Optional, List, Tuple |
| | | from datetime import datetime |
| | | from sqlalchemy.orm import Session |
| | |
| | | from app.services.redis_queue import redis_queue |
| | | from app.services.ecloud_client import ecloud_client |
| | | from app.services.dify_client import dify_client |
| | | from app.services.friend_ignore_service import friend_ignore_service |
| | | from app.services.silence_service import silence_service |
| | | from app.services.group_stats_service import group_stats_service |
| | | from config import settings |
| | | |
| | | |
| | |
| | | def __init__(self): |
| | | pass |
| | | |
| | | def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: |
| | | def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str: |
| | | """ |
| | | 提取引用消息的内容(message_type为80014) |
| | | |
| | | Args: |
| | | callback_data: 回调数据 |
| | | |
| | | Returns: |
| | | 组合后的消息内容(content + title,中间空一行) |
| | | """ |
| | | try: |
| | | data = callback_data.get("data", {}) |
| | | content = data.get("content", "") |
| | | title = data.get("title", "") |
| | | |
| | | # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容 |
| | | refer_content = "" |
| | | xml_title = "" |
| | | try: |
| | | # 解析XML |
| | | root = ET.fromstring(content) |
| | | |
| | | # 查找msg>appmsg>refermsg>content路径 |
| | | appmsg = root.find("appmsg") |
| | | if appmsg is not None: |
| | | # 提取XML中的title(如果存在) |
| | | title_element = appmsg.find("title") |
| | | if title_element is not None and title_element.text: |
| | | xml_title = title_element.text.strip() |
| | | |
| | | # 提取引用消息内容 |
| | | refermsg = appmsg.find("refermsg") |
| | | if refermsg is not None: |
| | | content_element = refermsg.find("content") |
| | | if content_element is not None and content_element.text: |
| | | refer_content = content_element.text.strip() |
| | | |
| | | except ET.ParseError as e: |
| | | logger.warning(f"XML解析失败: {str(e)}, content={content}") |
| | | # 如果XML解析失败,使用原始content |
| | | refer_content = content |
| | | |
| | | # 确定最终使用的title:优先使用XML中的title,其次使用data.title |
| | | final_title = xml_title if xml_title else title |
| | | |
| | | # 组合内容:refer_content在前,final_title在后,中间空一行 |
| | | if refer_content and final_title: |
| | | combined_content = f"{refer_content}\n\n{final_title}" |
| | | elif refer_content: |
| | | combined_content = refer_content |
| | | elif final_title: |
| | | combined_content = final_title |
| | | else: |
| | | combined_content = content # 如果都没有,使用原始content |
| | | |
| | | logger.info( |
| | | f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}" |
| | | ) |
| | | return combined_content |
| | | |
| | | except Exception as e: |
| | | logger.error(f"提取引用消息内容异常: error={str(e)}") |
| | | # 异常情况下返回原始content |
| | | return callback_data.get("data", {}).get("content", "") |
| | | |
| | | def parse_at_mentions(self, ai_answer: str, from_user: str) -> Tuple[str, List[str]]: |
| | | """ |
| | | 解析AI回复中的@字符,提取客服名称 |
| | | |
| | |
| | | customer_service_names = settings.customer_service_names |
| | | |
| | | # 查找所有@字符后的客服名称 |
| | | at_pattern = r'@([^\s]+)' |
| | | at_pattern = r"@([^\s]+)" |
| | | matches = re.findall(at_pattern, ai_answer) |
| | | |
| | | valid_at_names = [] |
| | |
| | | with next(get_db()) as db: |
| | | for name in valid_at_names: |
| | | # 根据nick_name查找联系人 |
| | | contact = db.query(Contact).filter(Contact.nick_name == name).first() |
| | | contact = ( |
| | | db.query(Contact).filter(Contact.nick_name == name).first() |
| | | ) |
| | | if contact: |
| | | at_wc_ids.append(contact.wc_id) |
| | | logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}") |
| | | # 如果from_user包含@openim表示是企业微信,使用work_wc_id |
| | | if "@openim" in from_user: |
| | | at_wc_ids.append(contact.work_wc_id) |
| | | logger.info( |
| | | f"找到客服联系人-微信: name={name}, wc_id={contact.work_wc_id}" |
| | | ) |
| | | else: |
| | | at_wc_ids.append(contact.wc_id) |
| | | logger.info( |
| | | f"找到客服联系人-企业微信: name={name}, wc_id={contact.wc_id}" |
| | | ) |
| | | else: |
| | | logger.warning(f"未找到客服联系人: name={name}") |
| | | |
| | |
| | | except Exception as e: |
| | | logger.error(f"解析@字符异常: error={str(e)}") |
| | | return ai_answer, [] |
| | | |
| | | def is_end_str(self, ai_answer: str) -> bool: |
| | | """ |
| | | 解析AI回复判断是否是结束字符串 |
| | | |
| | | Args: |
| | | ai_answer: AI回复内容 |
| | | |
| | | Returns: |
| | | """ |
| | | try: |
| | | # 获取配置的结束字符串列表 |
| | | end_str_list = settings.end_str_list |
| | | for end_str in end_str_list: |
| | | substrings = end_str.split(',') |
| | | # 检查 match_str 是否包含每一个子字符串 |
| | | if all(sub in ai_answer for sub in substrings): |
| | | return True |
| | | return False |
| | | except Exception as e: |
| | | logger.error(f"解析结束字符串异常: error={str(e)}") |
| | | return False |
| | | |
| | | def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: |
| | | """ |
| | |
| | | Returns: |
| | | 是否是有效的群聊消息 |
| | | """ |
| | | # 检查消息类型是否是群聊消息(80001) |
| | | # 检查消息类型是否是群聊消息(80001)或引用消息(80014) |
| | | message_type = callback_data.get("messageType") |
| | | if message_type != "80001": |
| | | print(f"data: {callback_data}") |
| | | if message_type not in ["80001", "80014"]: |
| | | logger.info(f"忽略非群聊消息: messageType={message_type}") |
| | | return False |
| | | |
| | |
| | | logger.warning(f"消息缺少必要字段: data={data}") |
| | | return False |
| | | |
| | | # 获取用户和群组信息 |
| | | from_user = data.get("fromUser") |
| | | from_group = data.get("fromGroup") |
| | | |
| | | # 检查发送者是否在好友忽略列表中(传入群组ID用于测试群组检查) |
| | | is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user, from_group) |
| | | |
| | | if is_friend_ignored: |
| | | logger.info( |
| | | f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计) |
| | | group_stats_service.increment_user_message_count(from_group, from_user) |
| | | # 激活或延长该群组的静默模式 |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info( |
| | | f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info( |
| | | f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | return False |
| | | |
| | | # # 统计正常处理的好友发言次数 |
| | | # group_stats_service.increment_user_message_count(from_group, from_user) |
| | | |
| | | # 检查该群组的静默模式是否激活(在好友忽略检查之后) |
| | | if silence_service.is_silence_active(from_group): |
| | | logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}") |
| | | return False |
| | | |
| | | return True |
| | | |
| | | def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool: |
| | |
| | | if not self.is_valid_group_message(callback_data): |
| | | return False |
| | | |
| | | data = callback_data.get("data", {}) |
| | | from_user = data.get("fromUser") |
| | | # 导入消息聚合服务(延迟导入避免循环依赖) |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | # 将整个回调内容转成JSON字符串存储到Redis队列 |
| | | return redis_queue.enqueue_message(from_user, callback_data) |
| | | # 尝试将消息添加到聚合中 |
| | | should_process_immediately, aggregated_data = ( |
| | | message_aggregator.add_message_to_aggregation(callback_data) |
| | | ) |
| | | |
| | | if should_process_immediately: |
| | | # 需要立即处理(不聚合或聚合超时) |
| | | data = ( |
| | | aggregated_data.get("data", {}) |
| | | if aggregated_data |
| | | else callback_data.get("data", {}) |
| | | ) |
| | | from_user = data.get("fromUser") |
| | | |
| | | # 将消息加入Redis队列 |
| | | return redis_queue.enqueue_message( |
| | | from_user, aggregated_data or callback_data |
| | | ) |
| | | else: |
| | | # 消息已被聚合,不需要立即处理 |
| | | logger.info( |
| | | f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}" |
| | | ) |
| | | return True |
| | | |
| | | def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: |
| | | """ |
| | |
| | | from_group = data.get("fromGroup") |
| | | content = data.get("content") |
| | | w_id = data.get("wId") |
| | | message_type = message_data.get("messageType") |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") |
| | | logger.info( |
| | | f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}" |
| | | ) |
| | | |
| | | # 根据消息类型处理内容 |
| | | if message_type == "80014": |
| | | # 引用消息,需要提取XML中的内容并与title组合 |
| | | content = self.extract_refer_message_content(message_data) |
| | | logger.info(f"引用消息内容处理完成: content_length={len(content)}") |
| | | |
| | | # 使用上下文管理器确保数据库会话正确管理 |
| | | with next(get_db()) as db: |
| | |
| | | logger.error(f"联系人信息处理失败: from_group={from_group}") |
| | | return False |
| | | |
| | | # 3.2 获取用户在当前群组的conversation_id |
| | | # 3.2 获取群组中发言次数最多的用户昵称 |
| | | # most_active_nickname = ( |
| | | # group_stats_service.get_most_active_user_nickname(from_group) |
| | | # ) |
| | | # logger.info( |
| | | # f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}" |
| | | # ) |
| | | # 获取默认客服名称 |
| | | nickname = settings.customer_service_default_name |
| | | |
| | | # 3.3 获取用户在当前群组的conversation_id |
| | | conversation_id = redis_queue.get_conversation_id(from_user, from_group) |
| | | |
| | | # 调用Dify接口发送消息 |
| | | dify_response = dify_client.send_chat_message( |
| | | query=content, user=from_user, conversation_id=conversation_id |
| | | # 调用Dify接口发送消息(根据配置选择模式) |
| | | dify_response = dify_client.send_message( |
| | | query=content, |
| | | user=from_user, |
| | | conversation_id=conversation_id, |
| | | nick_name=nickname, |
| | | ) |
| | | |
| | | if silence_service.is_silence_active(from_group): |
| | | # 回复前判断是否激活静默,已静默则不回复 |
| | | logger.error(f"Dify已响应但群组已静默:from_user={from_user}") |
| | | return False |
| | | |
| | | if not dify_response: |
| | | logger.error(f"Dify响应失败: from_user={from_user}") |
| | |
| | | |
| | | # 更新Redis中的conversation_id(基于用户+群组) |
| | | if new_conversation_id: |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) |
| | | redis_queue.set_conversation_id( |
| | | from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60 |
| | | ) |
| | | |
| | | # 3.3 保存对话记录到数据库 |
| | | # 3.4 保存对话记录到数据库 |
| | | # 按用户、群组和小时分组对话记录 |
| | | current_time = datetime.now() |
| | | hour_key = current_time.strftime("%Y%m%d_%H") |
| | |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | Conversation.hour == hour_key, |
| | | ) |
| | | .first() |
| | | ) |
| | |
| | | content_list = [] |
| | | |
| | | # 追加新的对话内容 |
| | | content_list.append({ |
| | | "user": content, |
| | | "ai": ai_answer |
| | | }) |
| | | content_list.append({"user": content, "ai": ai_answer}) |
| | | |
| | | # 更新记录 |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.content = json.dumps( |
| | | content_list, ensure_ascii=False |
| | | ) |
| | | existing_conversation.is_processed = True |
| | | logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}") |
| | | logger.info( |
| | | f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}" |
| | | ) |
| | | except json.JSONDecodeError as e: |
| | | logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建") |
| | | # 如果JSON解析失败,重新创建content |
| | | content_list = [{"user": content, "ai": ai_answer}] |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.content = json.dumps( |
| | | content_list, ensure_ascii=False |
| | | ) |
| | | existing_conversation.is_processed = True |
| | | else: |
| | | # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容 |
| | |
| | | is_processed=True, |
| | | ) |
| | | db.add(new_conversation) |
| | | logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1") |
| | | logger.info( |
| | | f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1" |
| | | ) |
| | | |
| | | db.commit() |
| | | |
| | |
| | | success = False |
| | | if ai_answer: |
| | | # 解析AI回复中的@字符 |
| | | processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer) |
| | | |
| | | processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer, from_user) |
| | | # 判断AI回复是否是结束字符串 |
| | | is_end_str = self.is_end_str(ai_answer) |
| | | # 发送消息,最多重试3次 |
| | | for attempt in range(3): |
| | | if at_wc_ids: |
| | |
| | | if ecloud_client.send_group_at_message( |
| | | w_id, from_group, processed_answer, at_wc_ids |
| | | ): |
| | | # @后触发静默模式 |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复@客服,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | flag = silence_service.activate_silence_mode(from_group) |
| | | if flag: |
| | | logger.info( |
| | | f"AI回复@客服,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | logger.info( |
| | | f"AI回复@客服,未激活静默模式: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | success = True |
| | | break |
| | | else: |
| | |
| | | success = True |
| | | break |
| | | |
| | | logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}") |
| | | logger.warning( |
| | | f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}" |
| | | ) |
| | | if attempt < 2: # 不是最后一次尝试,等待一段时间再重试 |
| | | time.sleep(2 ** attempt) # 指数退避 |
| | | |
| | | time.sleep(2**attempt) # 指数退避 |
| | | |
| | | # AI回复结束字符串触发静默模式 |
| | | if is_end_str: |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复结束字符串,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复结束字符串,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | |
| | | if success: |
| | | # 更新发送状态 |
| | | conversation = ( |
| | |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | Conversation.hour == hour_key, |
| | | ) |
| | | .first() |
| | | ) |