| | |
| | | """ |
| | | |
| | | import json |
| | | from typing import Dict, Any, Optional |
| | | import time |
| | | import re |
| | | from typing import Dict, Any, Optional, List, Tuple |
| | | from datetime import datetime |
| | | from sqlalchemy.orm import Session |
| | | from loguru import logger |
| | | |
| | |
| | | from app.services.redis_queue import redis_queue |
| | | from app.services.ecloud_client import ecloud_client |
| | | from app.services.dify_client import dify_client |
| | | from config import settings |
| | | |
| | | |
| | | class MessageProcessor: |
| | |
| | | |
| | | def __init__(self): |
| | | pass |
| | | |
| | | def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]: |
| | | """ |
| | | 解析AI回复中的@字符,提取客服名称 |
| | | |
| | | Args: |
| | | ai_answer: AI回复内容 |
| | | |
| | | Returns: |
| | | (处理后的消息内容, 需要@的客服wcid列表) |
| | | """ |
| | | try: |
| | | # 获取配置的客服名称列表 |
| | | customer_service_names = settings.customer_service_names |
| | | |
| | | # 查找所有@字符后的客服名称 |
| | | at_pattern = r'@([^\s]+)' |
| | | matches = re.findall(at_pattern, ai_answer) |
| | | |
| | | valid_at_names = [] |
| | | at_wc_ids = [] |
| | | |
| | | for match in matches: |
| | | # 检查是否在配置的客服名称列表中 |
| | | if match in customer_service_names: |
| | | valid_at_names.append(match) |
| | | logger.info(f"发现有效的@客服名称: {match}") |
| | | |
| | | # 如果有有效的@客服名称,查询数据库获取wcid |
| | | if valid_at_names: |
| | | with next(get_db()) as db: |
| | | for name in valid_at_names: |
| | | # 根据nick_name查找联系人 |
| | | contact = db.query(Contact).filter(Contact.nick_name == name).first() |
| | | if contact: |
| | | at_wc_ids.append(contact.wc_id) |
| | | logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}") |
| | | else: |
| | | logger.warning(f"未找到客服联系人: name={name}") |
| | | |
| | | return ai_answer, at_wc_ids |
| | | |
| | | except Exception as e: |
| | | logger.error(f"解析@字符异常: error={str(e)}") |
| | | return ai_answer, [] |
| | | |
| | | def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: |
| | | """ |
| | |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}") |
| | | |
| | | # 获取数据库会话 |
| | | db = next(get_db()) |
| | | |
| | | try: |
| | | # 使用上下文管理器确保数据库会话正确管理 |
| | | with next(get_db()) as db: |
| | | # 3.1 确保联系人信息存在 |
| | | if not self.ensure_contact_exists(from_group, w_id, db): |
| | | logger.error(f"联系人信息处理失败: from_group={from_group}") |
| | | return False |
| | | |
| | | # 3.2 获取用户的conversation_id |
| | | conversation_id = redis_queue.get_conversation_id(from_user) |
| | | # 3.2 获取用户在当前群组的conversation_id |
| | | conversation_id = redis_queue.get_conversation_id(from_user, from_group) |
| | | |
| | | # 调用Dify接口发送消息 |
| | | dify_response = dify_client.send_chat_message( |
| | |
| | | ai_answer = dify_response.get("answer", "") |
| | | new_conversation_id = dify_response.get("conversation_id", "") |
| | | |
| | | # 更新Redis中的conversation_id |
| | | # 更新Redis中的conversation_id(基于用户+群组) |
| | | if new_conversation_id: |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id) |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id, from_group) |
| | | |
| | | # 3.3 保存对话记录到数据库 |
| | | user_conversation_key = f"{from_user}_{new_conversation_id}" |
| | | # 按用户、群组和小时分组对话记录 |
| | | current_time = datetime.now() |
| | | hour_key = current_time.strftime("%Y%m%d_%H") |
| | | |
| | | # 检查是否已存在记录 |
| | | # 查找当前用户在当前群组当前小时的对话记录 |
| | | existing_conversation = ( |
| | | db.query(Conversation) |
| | | .filter(Conversation.user_conversation_key == user_conversation_key) |
| | | .filter( |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | ) |
| | | .first() |
| | | ) |
| | | |
| | | if existing_conversation: |
| | | # 更新现有记录 |
| | | existing_conversation.user_question = content |
| | | existing_conversation.ai_answer = ai_answer |
| | | existing_conversation.is_processed = True |
| | | logger.info(f"更新对话记录: key={user_conversation_key}") |
| | | # 更新现有记录 - 使用JSON格式追加对话内容(当前用户在当前群组当前小时的对话) |
| | | try: |
| | | # 解析现有的content JSON |
| | | if existing_conversation.content: |
| | | content_list = json.loads(existing_conversation.content) |
| | | else: |
| | | # 创建新记录 |
| | | content_list = [] |
| | | |
| | | # 追加新的对话内容 |
| | | content_list.append({ |
| | | "user": content, |
| | | "ai": ai_answer |
| | | }) |
| | | |
| | | # 更新记录 |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.is_processed = True |
| | | logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}") |
| | | except json.JSONDecodeError as e: |
| | | logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建") |
| | | # 如果JSON解析失败,重新创建content |
| | | content_list = [{"user": content, "ai": ai_answer}] |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.is_processed = True |
| | | else: |
| | | # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容 |
| | | content_list = [{"user": content, "ai": ai_answer}] |
| | | new_conversation = Conversation( |
| | | user_conversation_key=user_conversation_key, |
| | | from_user=from_user, |
| | | conversation_id=new_conversation_id, |
| | | user_question=content, |
| | | ai_answer=ai_answer, |
| | | group=from_group, |
| | | hour=hour_key, |
| | | content=json.dumps(content_list, ensure_ascii=False), |
| | | is_processed=True, |
| | | ) |
| | | db.add(new_conversation) |
| | | logger.info(f"创建对话记录: key={user_conversation_key}") |
| | | logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1") |
| | | |
| | | db.commit() |
| | | |
| | | # 发送AI回答到群聊 |
| | | if ai_answer and ecloud_client.send_group_message( |
| | | w_id, from_group, ai_answer |
| | | success = False |
| | | if ai_answer: |
| | | # 解析AI回复中的@字符 |
| | | processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer) |
| | | |
| | | # 发送消息,最多重试3次 |
| | | for attempt in range(3): |
| | | if at_wc_ids: |
| | | # 如果有@客服,使用群聊@接口 |
| | | logger.info(f"使用群聊@接口发送消息: at_wc_ids={at_wc_ids}") |
| | | if ecloud_client.send_group_at_message( |
| | | w_id, from_group, processed_answer, at_wc_ids |
| | | ): |
| | | success = True |
| | | break |
| | | else: |
| | | # 普通群聊消息 |
| | | logger.info("使用普通群聊接口发送消息") |
| | | if ecloud_client.send_group_message( |
| | | w_id, from_group, processed_answer |
| | | ): |
| | | success = True |
| | | break |
| | | |
| | | logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}") |
| | | if attempt < 2: # 不是最后一次尝试,等待一段时间再重试 |
| | | time.sleep(2 ** attempt) # 指数退避 |
| | | |
| | | if success: |
| | | # 更新发送状态 |
| | | conversation = ( |
| | | db.query(Conversation) |
| | | .filter( |
| | | Conversation.user_conversation_key == user_conversation_key |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | ) |
| | | .first() |
| | | ) |
| | | if conversation: |
| | | conversation.is_sent = True |
| | | conversation.sent_time = current_time |
| | | db.commit() |
| | | |
| | | logger.info(f"消息处理完成: from_user={from_user}") |
| | |
| | | else: |
| | | logger.error(f"发送AI回答失败: from_user={from_user}") |
| | | return False |
| | | |
| | | finally: |
| | | db.close() |
| | | |
| | | except Exception as e: |
| | | logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}") |