"""
|
Redis队列管理服务
|
"""
|
|
import json
|
from typing import Optional, Dict, Any
|
import redis
|
from loguru import logger
|
from config import settings
|
|
|
class RedisQueueManager:
|
"""Redis队列管理器"""
|
|
def __init__(self):
|
self.redis_client = redis.from_url(settings.redis_url, decode_responses=True)
|
self.queue_prefix = "ecloud_queue:"
|
self.processing_prefix = "ecloud_processing:"
|
self.conversation_prefix = "ecloud_conversation:"
|
|
def get_user_queue_key(self, from_user: str) -> str:
|
"""获取用户队列键"""
|
return f"{self.queue_prefix}{from_user}"
|
|
def get_processing_key(self, from_user: str) -> str:
|
"""获取处理状态键"""
|
return f"{self.processing_prefix}{from_user}"
|
|
def get_conversation_key(self, from_user: str) -> str:
|
"""获取用户对话ID键"""
|
return f"{self.conversation_prefix}{from_user}"
|
|
def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool:
|
"""将消息加入用户队列"""
|
try:
|
queue_key = self.get_user_queue_key(from_user)
|
message_json = json.dumps(message_data, ensure_ascii=False)
|
|
# 使用LPUSH将消息加入队列头部
|
result = self.redis_client.lpush(queue_key, message_json)
|
|
logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}")
|
return True
|
|
except Exception as e:
|
logger.error(f"消息入队失败: user={from_user}, error={str(e)}")
|
return False
|
|
def dequeue_message(
|
self, from_user: str, timeout: int = 0
|
) -> Optional[Dict[str, Any]]:
|
"""从用户队列中取出消息"""
|
try:
|
queue_key = self.get_user_queue_key(from_user)
|
|
if timeout > 0:
|
# 阻塞式取出消息
|
result = self.redis_client.brpop(queue_key, timeout=timeout)
|
if result:
|
_, message_json = result
|
else:
|
return None
|
else:
|
# 非阻塞式取出消息
|
message_json = self.redis_client.rpop(queue_key)
|
if not message_json:
|
return None
|
|
message_data = json.loads(message_json)
|
logger.info(f"消息已出队: user={from_user}")
|
return message_data
|
|
except Exception as e:
|
logger.error(f"消息出队失败: user={from_user}, error={str(e)}")
|
return None
|
|
def set_processing_status(
|
self, from_user: str, status: bool, ttl: int = 300
|
) -> bool:
|
"""设置用户处理状态"""
|
try:
|
processing_key = self.get_processing_key(from_user)
|
|
if status:
|
# 设置处理中状态,带过期时间
|
self.redis_client.setex(processing_key, ttl, "processing")
|
logger.info(
|
f"设置处理状态: user={from_user}, status=processing, ttl={ttl}"
|
)
|
else:
|
# 清除处理状态
|
self.redis_client.delete(processing_key)
|
logger.info(f"清除处理状态: user={from_user}")
|
|
return True
|
|
except Exception as e:
|
logger.error(f"设置处理状态失败: user={from_user}, error={str(e)}")
|
return False
|
|
def is_processing(self, from_user: str) -> bool:
|
"""检查用户是否正在处理中"""
|
try:
|
processing_key = self.get_processing_key(from_user)
|
return self.redis_client.exists(processing_key) > 0
|
except Exception as e:
|
logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}")
|
return False
|
|
def get_conversation_id(self, from_user: str) -> Optional[str]:
|
"""获取用户的对话ID"""
|
try:
|
conversation_key = self.get_conversation_key(from_user)
|
return self.redis_client.get(conversation_key)
|
except Exception as e:
|
logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}")
|
return None
|
|
def set_conversation_id(
|
self, from_user: str, conversation_id: str, ttl: int = 86400
|
) -> bool:
|
"""设置用户的对话ID"""
|
try:
|
conversation_key = self.get_conversation_key(from_user)
|
self.redis_client.setex(conversation_key, ttl, conversation_id)
|
logger.info(
|
f"设置对话ID: user={from_user}, conversation_id={conversation_id}"
|
)
|
return True
|
except Exception as e:
|
logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}")
|
return False
|
|
def get_queue_length(self, from_user: str) -> int:
|
"""获取用户队列长度"""
|
try:
|
queue_key = self.get_user_queue_key(from_user)
|
return self.redis_client.llen(queue_key)
|
except Exception as e:
|
logger.error(f"获取队列长度失败: user={from_user}, error={str(e)}")
|
return 0
|
|
def clear_user_queue(self, from_user: str) -> bool:
|
"""清空用户队列"""
|
try:
|
queue_key = self.get_user_queue_key(from_user)
|
self.redis_client.delete(queue_key)
|
logger.info(f"清空用户队列: user={from_user}")
|
return True
|
except Exception as e:
|
logger.error(f"清空用户队列失败: user={from_user}, error={str(e)}")
|
return False
|
|
|
# 全局Redis队列管理器实例
|
redis_queue = RedisQueueManager()
|