yj
2025-07-22 2a8d022a20f82c35e9df680dfb66c3c95863b190
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Redis队列管理服务
"""
 
import json
from typing import Optional, Dict, Any
import redis
from loguru import logger
from config import settings
 
 
class RedisQueueManager:
    """Redis队列管理器"""
 
    def __init__(self):
        self.redis_client = redis.from_url(settings.redis_url, decode_responses=True)
        self.queue_prefix = "ecloud_queue:"
        self.processing_prefix = "ecloud_processing:"
        self.conversation_prefix = "ecloud_conversation:"
 
    def get_user_queue_key(self, from_user: str) -> str:
        """获取用户队列键"""
        return f"{self.queue_prefix}{from_user}"
 
    def get_processing_key(self, from_user: str) -> str:
        """获取处理状态键"""
        return f"{self.processing_prefix}{from_user}"
 
    def get_conversation_key(self, from_user: str) -> str:
        """获取用户对话ID键"""
        return f"{self.conversation_prefix}{from_user}"
 
    def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool:
        """将消息加入用户队列"""
        try:
            queue_key = self.get_user_queue_key(from_user)
            message_json = json.dumps(message_data, ensure_ascii=False)
 
            # 使用LPUSH将消息加入队列头部
            result = self.redis_client.lpush(queue_key, message_json)
 
            logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}")
            return True
 
        except Exception as e:
            logger.error(f"消息入队失败: user={from_user}, error={str(e)}")
            return False
 
    def dequeue_message(
        self, from_user: str, timeout: int = 0
    ) -> Optional[Dict[str, Any]]:
        """从用户队列中取出消息"""
        try:
            queue_key = self.get_user_queue_key(from_user)
 
            if timeout > 0:
                # 阻塞式取出消息
                result = self.redis_client.brpop(queue_key, timeout=timeout)
                if result:
                    _, message_json = result
                else:
                    return None
            else:
                # 非阻塞式取出消息
                message_json = self.redis_client.rpop(queue_key)
                if not message_json:
                    return None
 
            message_data = json.loads(message_json)
            logger.info(f"消息已出队: user={from_user}")
            return message_data
 
        except Exception as e:
            logger.error(f"消息出队失败: user={from_user}, error={str(e)}")
            return None
 
    def set_processing_status(
        self, from_user: str, status: bool, ttl: int = 300
    ) -> bool:
        """设置用户处理状态"""
        try:
            processing_key = self.get_processing_key(from_user)
 
            if status:
                # 设置处理中状态,带过期时间
                self.redis_client.setex(processing_key, ttl, "processing")
                logger.info(
                    f"设置处理状态: user={from_user}, status=processing, ttl={ttl}"
                )
            else:
                # 清除处理状态
                self.redis_client.delete(processing_key)
                logger.info(f"清除处理状态: user={from_user}")
 
            return True
 
        except Exception as e:
            logger.error(f"设置处理状态失败: user={from_user}, error={str(e)}")
            return False
 
    def is_processing(self, from_user: str) -> bool:
        """检查用户是否正在处理中"""
        try:
            processing_key = self.get_processing_key(from_user)
            return self.redis_client.exists(processing_key) > 0
        except Exception as e:
            logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}")
            return False
 
    def get_conversation_id(self, from_user: str) -> Optional[str]:
        """获取用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            return self.redis_client.get(conversation_key)
        except Exception as e:
            logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}")
            return None
 
    def set_conversation_id(
        self, from_user: str, conversation_id: str, ttl: int = 86400
    ) -> bool:
        """设置用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            self.redis_client.setex(conversation_key, ttl, conversation_id)
            logger.info(
                f"设置对话ID: user={from_user}, conversation_id={conversation_id}"
            )
            return True
        except Exception as e:
            logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}")
            return False
 
    def get_queue_length(self, from_user: str) -> int:
        """获取用户队列长度"""
        try:
            queue_key = self.get_user_queue_key(from_user)
            return self.redis_client.llen(queue_key)
        except Exception as e:
            logger.error(f"获取队列长度失败: user={from_user}, error={str(e)}")
            return 0
 
    def clear_user_queue(self, from_user: str) -> bool:
        """清空用户队列"""
        try:
            queue_key = self.get_user_queue_key(from_user)
            self.redis_client.delete(queue_key)
            logger.info(f"清空用户队列: user={from_user}")
            return True
        except Exception as e:
            logger.error(f"清空用户队列失败: user={from_user}, error={str(e)}")
            return False
 
 
# 全局Redis队列管理器实例
redis_queue = RedisQueueManager()