yj
2025-07-23 1225b6cbf0a028b765a0ab6d784bcb80459a67bb
app/services/redis_queue.py
@@ -26,9 +26,14 @@
        """获取处理状态键"""
        return f"{self.processing_prefix}{from_user}"
    def get_conversation_key(self, from_user: str) -> str:
    def get_conversation_key(self, from_user: str, group: str = None) -> str:
        """获取用户对话ID键"""
        return f"{self.conversation_prefix}{from_user}"
        if group:
            # 基于用户+群组生成conversation_key,确保不同群组的对话独立
            return f"{self.conversation_prefix}{from_user}:{group}"
        else:
            # 兼容旧版本,仅基于用户
            return f"{self.conversation_prefix}{from_user}"
    def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool:
        """将消息加入用户队列"""
@@ -36,8 +41,8 @@
            queue_key = self.get_user_queue_key(from_user)
            message_json = json.dumps(message_data, ensure_ascii=False)
            # 使用LPUSH将消息加入队列头部
            result = self.redis_client.lpush(queue_key, message_json)
            # 使用RPUSH将消息加入队列尾部,确保FIFO顺序
            result = self.redis_client.rpush(queue_key, message_json)
            logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}")
            return True
@@ -107,28 +112,28 @@
            logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}")
            return False
    def get_conversation_id(self, from_user: str) -> Optional[str]:
    def get_conversation_id(self, from_user: str, group: str = None) -> Optional[str]:
        """获取用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            conversation_key = self.get_conversation_key(from_user, group)
            return self.redis_client.get(conversation_key)
        except Exception as e:
            logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}")
            logger.error(f"获取对话ID失败: user={from_user}, group={group}, error={str(e)}")
            return None
    def set_conversation_id(
        self, from_user: str, conversation_id: str, ttl: int = 86400
        self, from_user: str, conversation_id: str, group: str = None, ttl: int = 86400
    ) -> bool:
        """设置用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            conversation_key = self.get_conversation_key(from_user, group)
            self.redis_client.setex(conversation_key, ttl, conversation_id)
            logger.info(
                f"设置对话ID: user={from_user}, conversation_id={conversation_id}"
                f"设置对话ID: user={from_user}, group={group}, conversation_id={conversation_id}"
            )
            return True
        except Exception as e:
            logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}")
            logger.error(f"设置对话ID失败: user={from_user}, group={group}, error={str(e)}")
            return False
    def get_queue_length(self, from_user: str) -> int: