| | |
| | | else: |
| | | combined_content = content # 如果都没有,使用原始content |
| | | |
| | | logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}") |
| | | logger.info( |
| | | f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}" |
| | | ) |
| | | return combined_content |
| | | |
| | | except Exception as e: |
| | |
| | | customer_service_names = settings.customer_service_names |
| | | |
| | | # 查找所有@字符后的客服名称 |
| | | at_pattern = r'@([^\s]+)' |
| | | at_pattern = r"@([^\s]+)" |
| | | matches = re.findall(at_pattern, ai_answer) |
| | | |
| | | valid_at_names = [] |
| | |
| | | with next(get_db()) as db: |
| | | for name in valid_at_names: |
| | | # 根据nick_name查找联系人 |
| | | contact = db.query(Contact).filter(Contact.nick_name == name).first() |
| | | contact = ( |
| | | db.query(Contact).filter(Contact.nick_name == name).first() |
| | | ) |
| | | if contact: |
| | | at_wc_ids.append(contact.wc_id) |
| | | logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}") |
| | | logger.info( |
| | | f"找到客服联系人: name={name}, wc_id={contact.wc_id}" |
| | | ) |
| | | else: |
| | | logger.warning(f"未找到客服联系人: name={name}") |
| | | |
| | |
| | | except Exception as e: |
| | | logger.error(f"解析@字符异常: error={str(e)}") |
| | | return ai_answer, [] |
| | | |
| | | def is_end_str(self, ai_answer: str) -> bool: |
| | | """ |
| | | 解析AI回复判断是否是结束字符串 |
| | | |
| | | Args: |
| | | ai_answer: AI回复内容 |
| | | |
| | | Returns: |
| | | """ |
| | | try: |
| | | # 获取配置的结束字符串列表 |
| | | end_str_list = settings.end_str_list |
| | | for end_str in end_str_list: |
| | | substrings = end_str.split(',') |
| | | # 检查 match_str 是否包含每一个子字符串 |
| | | if all(sub in ai_answer for sub in substrings): |
| | | return True |
| | | return False |
| | | except Exception as e: |
| | | logger.error(f"解析结束字符串异常: error={str(e)}") |
| | | return False |
| | | |
| | | def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool: |
| | | """ |
| | |
| | | is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user) |
| | | |
| | | if is_friend_ignored: |
| | | logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}") |
| | | logger.info( |
| | | f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计) |
| | | group_stats_service.increment_user_message_count(from_group, from_user) |
| | | # 激活或延长该群组的静默模式 |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}") |
| | | logger.info( |
| | | f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}") |
| | | logger.info( |
| | | f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | return False |
| | | |
| | | # # 统计正常处理的好友发言次数 |
| | |
| | | from app.services.message_aggregator import message_aggregator |
| | | |
| | | # 尝试将消息添加到聚合中 |
| | | should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data) |
| | | should_process_immediately, aggregated_data = ( |
| | | message_aggregator.add_message_to_aggregation(callback_data) |
| | | ) |
| | | |
| | | if should_process_immediately: |
| | | # 需要立即处理(不聚合或聚合超时) |
| | | data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {}) |
| | | data = ( |
| | | aggregated_data.get("data", {}) |
| | | if aggregated_data |
| | | else callback_data.get("data", {}) |
| | | ) |
| | | from_user = data.get("fromUser") |
| | | |
| | | # 将消息加入Redis队列 |
| | | return redis_queue.enqueue_message(from_user, aggregated_data or callback_data) |
| | | return redis_queue.enqueue_message( |
| | | from_user, aggregated_data or callback_data |
| | | ) |
| | | else: |
| | | # 消息已被聚合,不需要立即处理 |
| | | logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}") |
| | | logger.info( |
| | | f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}" |
| | | ) |
| | | return True |
| | | |
| | | def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool: |
| | |
| | | w_id = data.get("wId") |
| | | message_type = message_data.get("messageType") |
| | | |
| | | logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}") |
| | | logger.info( |
| | | f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}" |
| | | ) |
| | | |
| | | # 根据消息类型处理内容 |
| | | if message_type == "80014": |
| | |
| | | return False |
| | | |
| | | # 3.2 获取群组中发言次数最多的用户昵称 |
| | | most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group) |
| | | logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}") |
| | | most_active_nickname = ( |
| | | group_stats_service.get_most_active_user_nickname(from_group) |
| | | ) |
| | | logger.info( |
| | | f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}" |
| | | ) |
| | | |
| | | # 3.3 获取用户在当前群组的conversation_id |
| | | conversation_id = redis_queue.get_conversation_id(from_user, from_group) |
| | |
| | | query=content, |
| | | user=from_user, |
| | | conversation_id=conversation_id, |
| | | nick_name=most_active_nickname |
| | | nick_name=most_active_nickname, |
| | | ) |
| | | |
| | | if silence_service.is_silence_active(from_group): |
| | | # 回复前判断是否激活静默,已静默则不回复 |
| | | logger.error(f"Dify已响应但群组已静默:from_user={from_user}") |
| | | return False |
| | | |
| | | if not dify_response: |
| | | logger.error(f"Dify响应失败: from_user={from_user}") |
| | |
| | | |
| | | # 更新Redis中的conversation_id(基于用户+群组) |
| | | if new_conversation_id: |
| | | redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800) |
| | | redis_queue.set_conversation_id( |
| | | from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60 |
| | | ) |
| | | |
| | | # 3.4 保存对话记录到数据库 |
| | | # 按用户、群组和小时分组对话记录 |
| | |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | Conversation.hour == hour_key, |
| | | ) |
| | | .first() |
| | | ) |
| | |
| | | content_list = [] |
| | | |
| | | # 追加新的对话内容 |
| | | content_list.append({ |
| | | "user": content, |
| | | "ai": ai_answer |
| | | }) |
| | | content_list.append({"user": content, "ai": ai_answer}) |
| | | |
| | | # 更新记录 |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.content = json.dumps( |
| | | content_list, ensure_ascii=False |
| | | ) |
| | | existing_conversation.is_processed = True |
| | | logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}") |
| | | logger.info( |
| | | f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}" |
| | | ) |
| | | except json.JSONDecodeError as e: |
| | | logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建") |
| | | # 如果JSON解析失败,重新创建content |
| | | content_list = [{"user": content, "ai": ai_answer}] |
| | | existing_conversation.content = json.dumps(content_list, ensure_ascii=False) |
| | | existing_conversation.content = json.dumps( |
| | | content_list, ensure_ascii=False |
| | | ) |
| | | existing_conversation.is_processed = True |
| | | else: |
| | | # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容 |
| | |
| | | is_processed=True, |
| | | ) |
| | | db.add(new_conversation) |
| | | logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1") |
| | | logger.info( |
| | | f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1" |
| | | ) |
| | | |
| | | db.commit() |
| | | |
| | |
| | | if ai_answer: |
| | | # 解析AI回复中的@字符 |
| | | processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer) |
| | | |
| | | # 判断AI回复是否是结束字符串 |
| | | is_end_str = self.is_end_str(ai_answer) |
| | | # 发送消息,最多重试3次 |
| | | for attempt in range(3): |
| | | if at_wc_ids: |
| | |
| | | if ecloud_client.send_group_at_message( |
| | | w_id, from_group, processed_answer, at_wc_ids |
| | | ): |
| | | # @后触发静默模式 |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复@客服,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复@客服,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | success = True |
| | | break |
| | | else: |
| | |
| | | success = True |
| | | break |
| | | |
| | | logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}") |
| | | logger.warning( |
| | | f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}" |
| | | ) |
| | | if attempt < 2: # 不是最后一次尝试,等待一段时间再重试 |
| | | time.sleep(2 ** attempt) # 指数退避 |
| | | |
| | | time.sleep(2**attempt) # 指数退避 |
| | | |
| | | # AI回复结束字符串触发静默模式 |
| | | if is_end_str: |
| | | if silence_service.is_silence_active(from_group): |
| | | # 如果该群组静默模式已激活,延长时间 |
| | | silence_service.extend_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复结束字符串,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | else: |
| | | # 如果该群组静默模式未激活,激活静默模式 |
| | | silence_service.activate_silence_mode(from_group) |
| | | logger.info( |
| | | f"AI回复结束字符串,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}" |
| | | ) |
| | | |
| | | if success: |
| | | # 更新发送状态 |
| | | conversation = ( |
| | |
| | | Conversation.from_user == from_user, |
| | | Conversation.conversation_id == new_conversation_id, |
| | | Conversation.group == from_group, |
| | | Conversation.hour == hour_key |
| | | Conversation.hour == hour_key, |
| | | ) |
| | | .first() |
| | | ) |