1.新增结束语判断;2.增加@客服后静默;3.增加判断群已静默后AI已回复也不发消息
1个文件已添加
4个文件已修改
2个文件已删除
446 ■■■■■ 已修改文件
app/api/callback.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/message_processor.py 153 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
example_usage.py 136 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logs/app.log 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_group_stats.py 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/callback.py
@@ -48,8 +48,13 @@
            f"收到回调消息: messageType={callback_data.messageType}, wcId={callback_data.wcId}"
        )
        # 将Pydantic模型转换为字典
        callback_dict = callback_data.model_dump()
        # 将Pydantic模型转换为字典(兼容v1和v2)
        if hasattr(callback_data, 'model_dump'):
            # Pydantic v2
            callback_dict = callback_data.model_dump()
        else:
            # Pydantic v1
            callback_dict = callback_data.dict()
        # 将消息加入队列
        success = message_processor.enqueue_callback_message(callback_dict)
app/services/message_processor.py
@@ -84,7 +84,9 @@
            else:
                combined_content = content  # 如果都没有,使用原始content
            logger.info(f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}")
            logger.info(
                f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}"
            )
            return combined_content
        except Exception as e:
@@ -107,7 +109,7 @@
            customer_service_names = settings.customer_service_names
            # 查找所有@字符后的客服名称
            at_pattern = r'@([^\s]+)'
            at_pattern = r"@([^\s]+)"
            matches = re.findall(at_pattern, ai_answer)
            valid_at_names = []
@@ -124,10 +126,14 @@
                with next(get_db()) as db:
                    for name in valid_at_names:
                        # 根据nick_name查找联系人
                        contact = db.query(Contact).filter(Contact.nick_name == name).first()
                        contact = (
                            db.query(Contact).filter(Contact.nick_name == name).first()
                        )
                        if contact:
                            at_wc_ids.append(contact.wc_id)
                            logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}")
                            logger.info(
                                f"找到客服联系人: name={name}, wc_id={contact.wc_id}"
                            )
                        else:
                            logger.warning(f"未找到客服联系人: name={name}")
@@ -136,6 +142,28 @@
        except Exception as e:
            logger.error(f"解析@字符异常: error={str(e)}")
            return ai_answer, []
    def is_end_str(self, ai_answer: str) -> bool:
        """
        解析AI回复判断是否是结束字符串
        Args:
            ai_answer: AI回复内容
        Returns:
        """
        try:
            # 获取配置的结束字符串列表
            end_str_list = settings.end_str_list
            for end_str in end_str_list:
                substrings = end_str.split(',')
                # 检查 match_str 是否包含每一个子字符串
                if all(sub in ai_answer for sub in substrings):
                    return True
            return False
        except Exception as e:
            logger.error(f"解析结束字符串异常: error={str(e)}")
            return False
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
@@ -177,18 +205,24 @@
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user)
        if is_friend_ignored:
            logger.info(f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}")
            logger.info(
                f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}"
            )
            # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
            group_stats_service.increment_user_message_count(from_group, from_user)
            # 激活或延长该群组的静默模式
            if silence_service.is_silence_active(from_group):
                # 如果该群组静默模式已激活,延长时间
                silence_service.extend_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}")
                logger.info(
                    f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                )
            else:
                # 如果该群组静默模式未激活,激活静默模式
                silence_service.activate_silence_mode(from_group)
                logger.info(f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}")
                logger.info(
                    f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                )
            return False
        # # 统计正常处理的好友发言次数
@@ -219,18 +253,28 @@
        from app.services.message_aggregator import message_aggregator
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = message_aggregator.add_message_to_aggregation(callback_data)
        should_process_immediately, aggregated_data = (
            message_aggregator.add_message_to_aggregation(callback_data)
        )
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = aggregated_data.get("data", {}) if aggregated_data else callback_data.get("data", {})
            data = (
                aggregated_data.get("data", {})
                if aggregated_data
                else callback_data.get("data", {})
            )
            from_user = data.get("fromUser")
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(from_user, aggregated_data or callback_data)
            return redis_queue.enqueue_message(
                from_user, aggregated_data or callback_data
            )
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}")
            logger.info(
                f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}"
            )
            return True
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
@@ -309,7 +353,9 @@
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}")
            logger.info(
                f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}"
            )
            # 根据消息类型处理内容
            if message_type == "80014":
@@ -325,8 +371,12 @@
                    return False
                # 3.2 获取群组中发言次数最多的用户昵称
                most_active_nickname = group_stats_service.get_most_active_user_nickname(from_group)
                logger.info(f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}")
                most_active_nickname = (
                    group_stats_service.get_most_active_user_nickname(from_group)
                )
                logger.info(
                    f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}"
                )
                # 3.3 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
@@ -336,8 +386,13 @@
                    query=content,
                    user=from_user,
                    conversation_id=conversation_id,
                    nick_name=most_active_nickname
                    nick_name=most_active_nickname,
                )
                if silence_service.is_silence_active(from_group):
                    # 回复前判断是否激活静默,已静默则不回复
                    logger.error(f"Dify已响应但群组已静默:from_user={from_user}")
                    return False
                if not dify_response:
                    logger.error(f"Dify响应失败: from_user={from_user}")
@@ -349,7 +404,9 @@
                # 更新Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group, 1800)
                    redis_queue.set_conversation_id(
                        from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60
                    )
                # 3.4 保存对话记录到数据库
                # 按用户、群组和小时分组对话记录
@@ -363,7 +420,7 @@
                        Conversation.from_user == from_user,
                        Conversation.conversation_id == new_conversation_id,
                        Conversation.group == from_group,
                        Conversation.hour == hour_key
                        Conversation.hour == hour_key,
                    )
                    .first()
                )
@@ -378,20 +435,23 @@
                            content_list = []
                        # 追加新的对话内容
                        content_list.append({
                            "user": content,
                            "ai": ai_answer
                        })
                        content_list.append({"user": content, "ai": ai_answer})
                        # 更新记录
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                        logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}")
                        logger.info(
                            f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}"
                        )
                    except json.JSONDecodeError as e:
                        logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建")
                        # 如果JSON解析失败,重新创建content
                        content_list = [{"user": content, "ai": ai_answer}]
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                else:
                    # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容
@@ -405,7 +465,9 @@
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1")
                    logger.info(
                        f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1"
                    )
                db.commit()
@@ -414,7 +476,8 @@
                if ai_answer:
                    # 解析AI回复中的@字符
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
                    # 判断AI回复是否是结束字符串
                    is_end_str = self.is_end_str(ai_answer)
                    # 发送消息,最多重试3次
                    for attempt in range(3):
                        if at_wc_ids:
@@ -423,6 +486,19 @@
                            if ecloud_client.send_group_at_message(
                                w_id, from_group, processed_answer, at_wc_ids
                            ):
                                # @后触发静默模式
                                if silence_service.is_silence_active(from_group):
                                    # 如果该群组静默模式已激活,延长时间
                                    silence_service.extend_silence_mode(from_group)
                                    logger.info(
                                        f"AI回复@客服,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                                    )
                                else:
                                    # 如果该群组静默模式未激活,激活静默模式
                                    silence_service.activate_silence_mode(from_group)
                                    logger.info(
                                        f"AI回复@客服,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                                    )
                                success = True
                                break
                        else:
@@ -434,10 +510,27 @@
                                success = True
                                break
                        logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}")
                        logger.warning(
                            f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}"
                        )
                        if attempt < 2:  # 不是最后一次尝试,等待一段时间再重试
                            time.sleep(2 ** attempt)  # 指数退避
                            time.sleep(2**attempt)  # 指数退避
                    # AI回复结束字符串触发静默模式
                    if is_end_str:
                        if silence_service.is_silence_active(from_group):
                            # 如果该群组静默模式已激活,延长时间
                            silence_service.extend_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                            )
                        else:
                            # 如果该群组静默模式未激活,激活静默模式
                            silence_service.activate_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                            )
                if success:
                    # 更新发送状态
                    conversation = (
@@ -446,7 +539,7 @@
                            Conversation.from_user == from_user,
                            Conversation.conversation_id == new_conversation_id,
                            Conversation.group == from_group,
                            Conversation.hour == hour_key
                            Conversation.hour == hour_key,
                        )
                        .first()
                    )
config.py
@@ -78,9 +78,9 @@
        self.friend_ignore_whitelist = friend_ignore_config["whitelist"]
        # 静默模式配置
        silence_mode_config = config_data["silence_mode"]
        self.silence_mode_enabled = silence_mode_config["enabled"]
        self.silence_duration_minutes = silence_mode_config["duration_minutes"]
        silence_mode_config = config_data.get("silence_mode", {})
        self.silence_mode_enabled = silence_mode_config.get("enabled", True)
        self.silence_duration_minutes = silence_mode_config.get("duration_minutes", 10)
        # 在线状态监控配置
        online_status_config = config_data["online_status_monitor"]
@@ -116,6 +116,9 @@
        self.message_aggregation_enabled = message_aggregation_config.get("enabled", True)
        self.message_aggregation_timeout = message_aggregation_config.get("timeout_seconds", 15)
        # 结束字符串配置
        self.end_str_list = config_data.get("end_str_list", [])
    def update_ecloud_w_id(self, new_w_id: str) -> bool:
        """
        动态更新E云管家的w_id配置
example_usage.py
File was deleted
logs/app.log
@@ -1,5 +1,2 @@
2025-08-07 16:49:53 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:07:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:09:51 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:48:05 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-07 17:51:18 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-22 16:15:05 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
2025-08-22 16:34:15 | INFO | __main__:<module>:132 - 启动E云管家-DifyAI对接服务
test.py
New file
@@ -0,0 +1,13 @@
def match_string(pattern_list, match_str):
    for pattern in pattern_list:
        substrings = pattern.split(',')
        # 检查 match_str 是否包含每一个子字符串
        if all(sub in match_str for sub in substrings):
            return True
    return False
# 示例使用
pattern_list = ["嗯嗯,问题,随时", "随时,找我"]
match_str = "嗯嗯,有什么事情随时联系"
result = match_string(pattern_list, match_str)
print(result)  # 输出: True
test_group_stats.py
File was deleted