""" Redis队列管理服务 """ import json from typing import Optional, Dict, Any import redis from loguru import logger from config import settings class RedisQueueManager: """Redis队列管理器""" def __init__(self): self.redis_client = redis.from_url(settings.redis_url, decode_responses=True) self.queue_prefix = "ecloud_queue:" self.processing_prefix = "ecloud_processing:" self.conversation_prefix = "ecloud_conversation:" def get_user_queue_key(self, from_user: str) -> str: """获取用户队列键""" return f"{self.queue_prefix}{from_user}" def get_processing_key(self, from_user: str) -> str: """获取处理状态键""" return f"{self.processing_prefix}{from_user}" def get_conversation_key(self, from_user: str) -> str: """获取用户对话ID键""" return f"{self.conversation_prefix}{from_user}" def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool: """将消息加入用户队列""" try: queue_key = self.get_user_queue_key(from_user) message_json = json.dumps(message_data, ensure_ascii=False) # 使用LPUSH将消息加入队列头部 result = self.redis_client.lpush(queue_key, message_json) logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}") return True except Exception as e: logger.error(f"消息入队失败: user={from_user}, error={str(e)}") return False def dequeue_message( self, from_user: str, timeout: int = 0 ) -> Optional[Dict[str, Any]]: """从用户队列中取出消息""" try: queue_key = self.get_user_queue_key(from_user) if timeout > 0: # 阻塞式取出消息 result = self.redis_client.brpop(queue_key, timeout=timeout) if result: _, message_json = result else: return None else: # 非阻塞式取出消息 message_json = self.redis_client.rpop(queue_key) if not message_json: return None message_data = json.loads(message_json) logger.info(f"消息已出队: user={from_user}") return message_data except Exception as e: logger.error(f"消息出队失败: user={from_user}, error={str(e)}") return None def set_processing_status( self, from_user: str, status: bool, ttl: int = 300 ) -> bool: """设置用户处理状态""" try: processing_key = self.get_processing_key(from_user) if status: # 设置处理中状态,带过期时间 self.redis_client.setex(processing_key, ttl, "processing") logger.info( f"设置处理状态: user={from_user}, status=processing, ttl={ttl}" ) else: # 清除处理状态 self.redis_client.delete(processing_key) logger.info(f"清除处理状态: user={from_user}") return True except Exception as e: logger.error(f"设置处理状态失败: user={from_user}, error={str(e)}") return False def is_processing(self, from_user: str) -> bool: """检查用户是否正在处理中""" try: processing_key = self.get_processing_key(from_user) return self.redis_client.exists(processing_key) > 0 except Exception as e: logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}") return False def get_conversation_id(self, from_user: str) -> Optional[str]: """获取用户的对话ID""" try: conversation_key = self.get_conversation_key(from_user) return self.redis_client.get(conversation_key) except Exception as e: logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}") return None def set_conversation_id( self, from_user: str, conversation_id: str, ttl: int = 86400 ) -> bool: """设置用户的对话ID""" try: conversation_key = self.get_conversation_key(from_user) self.redis_client.setex(conversation_key, ttl, conversation_id) logger.info( f"设置对话ID: user={from_user}, conversation_id={conversation_id}" ) return True except Exception as e: logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}") return False def get_queue_length(self, from_user: str) -> int: """获取用户队列长度""" try: queue_key = self.get_user_queue_key(from_user) return self.redis_client.llen(queue_key) except Exception as e: logger.error(f"获取队列长度失败: user={from_user}, error={str(e)}") return 0 def clear_user_queue(self, from_user: str) -> bool: """清空用户队列""" try: queue_key = self.get_user_queue_key(from_user) self.redis_client.delete(queue_key) logger.info(f"清空用户队列: user={from_user}") return True except Exception as e: logger.error(f"清空用户队列失败: user={from_user}, error={str(e)}") return False # 全局Redis队列管理器实例 redis_queue = RedisQueueManager()