| | |
| | | """获取处理状态键""" |
| | | return f"{self.processing_prefix}{from_user}" |
| | | |
| | | def get_conversation_key(self, from_user: str) -> str: |
| | | def get_conversation_key(self, from_user: str, group: str = None) -> str: |
| | | """获取用户对话ID键""" |
| | | return f"{self.conversation_prefix}{from_user}" |
| | | if group: |
| | | # 基于用户+群组生成conversation_key,确保不同群组的对话独立 |
| | | return f"{self.conversation_prefix}{from_user}:{group}" |
| | | else: |
| | | # 兼容旧版本,仅基于用户 |
| | | return f"{self.conversation_prefix}{from_user}" |
| | | |
| | | def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool: |
| | | """将消息加入用户队列""" |
| | |
| | | queue_key = self.get_user_queue_key(from_user) |
| | | message_json = json.dumps(message_data, ensure_ascii=False) |
| | | |
| | | # 使用LPUSH将消息加入队列头部 |
| | | result = self.redis_client.lpush(queue_key, message_json) |
| | | # 使用RPUSH将消息加入队列尾部,确保FIFO顺序 |
| | | result = self.redis_client.rpush(queue_key, message_json) |
| | | |
| | | logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}") |
| | | return True |
| | |
| | | logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}") |
| | | return False |
| | | |
| | | def get_conversation_id(self, from_user: str) -> Optional[str]: |
| | | def get_conversation_id(self, from_user: str, group: str = None) -> Optional[str]: |
| | | """获取用户的对话ID""" |
| | | try: |
| | | conversation_key = self.get_conversation_key(from_user) |
| | | conversation_key = self.get_conversation_key(from_user, group) |
| | | return self.redis_client.get(conversation_key) |
| | | except Exception as e: |
| | | logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}") |
| | | logger.error(f"获取对话ID失败: user={from_user}, group={group}, error={str(e)}") |
| | | return None |
| | | |
| | | def set_conversation_id( |
| | | self, from_user: str, conversation_id: str, ttl: int = 86400 |
| | | self, from_user: str, conversation_id: str, group: str = None, ttl: int = 86400 |
| | | ) -> bool: |
| | | """设置用户的对话ID""" |
| | | try: |
| | | conversation_key = self.get_conversation_key(from_user) |
| | | conversation_key = self.get_conversation_key(from_user, group) |
| | | self.redis_client.setex(conversation_key, ttl, conversation_id) |
| | | logger.info( |
| | | f"设置对话ID: user={from_user}, conversation_id={conversation_id}" |
| | | f"设置对话ID: user={from_user}, group={group}, conversation_id={conversation_id}" |
| | | ) |
| | | return True |
| | | except Exception as e: |
| | | logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}") |
| | | logger.error(f"设置对话ID失败: user={from_user}, group={group}, error={str(e)}") |
| | | return False |
| | | |
| | | def get_queue_length(self, from_user: str) -> int: |