yj
2026-03-31 033d919018b3a3e12755f008c0b9093364942512
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import redis
import json
import time
from config_manager import global_config
 
 
class RedisManager:
    def __init__(self):
        self.config = global_config.get_config()
        self.redis = redis.StrictRedis(
            host=self.config.REDIS_HOST,
            port=self.config.REDIS_PORT,
            db=self.config.REDIS_DB,
            password=self.config.REDIS_PASSWORD,
            decode_responses=True,
            max_connections=100,
            socket_timeout=5
        )
 
    def cache_group_data(self, group_id, data_type, data):
        """缓存群组相关数据"""
        key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}"
        if isinstance(data, list):
            self.redis.delete(key)
            if data:
                self.redis.sadd(key, *data)
        else:
            self.redis.set(key, json.dumps(data))
 
    def get_group_data(self, group_id, data_type):
        """获取群组相关数据"""
        key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}"
        data_type = data_type.lower()
 
        if data_type in ['users', 'sms_accounts']:
            return self.redis.smembers(key) or []
        else:
            data = self.redis.get(key)
            return json.loads(data) if data else None
 
    def add_user_message(self, user_id, role, content):
        """添加用户消息到Redis"""
        key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages"
        message = {
            "role": role,
            "content": content,
            "timestamp": time.time()
        }
        self.redis.rpush(key, json.dumps(message))
        self.redis.expire(key, self.config.MESSAGE_EXPIRE)
 
    def add_conversation_id(self, user_id, conversation_id):
        """添加会话id"""
        key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id"
        self.redis.set(key, conversation_id)
        self.redis.expire(key, self.config.MESSAGE_EXPIRE)
 
    def get_user_messages(self, user_id):
        """获取用户的所有消息"""
        key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages"
        messages = self.redis.lrange(key, 0, -1)
        return [json.loads(msg) for msg in messages] if messages else []
 
    def get_user_conversation_id(self, user_id):
        """获取用户的conversation_id"""
        key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id"
        return self.redis.get(key)
 
    def clear_user_messages(self, user_id):
        """清除用户的消息缓存"""
        key_prefix = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}"
        key1 = f"{key_prefix}:messages"
        key2 = f"{key_prefix}:conversation_id"
        self.redis.delete(key1)
        self.redis.delete(key2)
 
    def add_stream(self, stream_id):
        """添加流列表"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}"
        self.redis.set(key, 0)
        self.redis.expire(key, 180)
 
    def add_stream_chunks(self, stream_id, chunks):
        """添加流列表"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
        self.redis.lpush(key, *chunks)
        self.redis.expire(key, 180)
 
    def get_stream_chunk(self, stream_id):
        """获取流块"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
        return self.redis.rpop(key)
 
    def add_stream_status(self, stream_id, status):
        """添加ai请求的状态"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status"
        self.redis.set(key, status)
        self.redis.expire(key, 180)
 
    def get_stream_status(self, stream_id):
        """添加ai请求的状态"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status"
        return self.redis.get(key)
 
    def add_stream_lock(self, stream_id, lock):
        """添加lock"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock"
        self.redis.set(key, lock)
        self.redis.expire(key, 180)
 
    def add_stream_conversation_id(self, stream_id, conversation_id):
        """添加ai请求的状态"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id"
        self.redis.set(key, conversation_id)
        self.redis.expire(key, 180)
 
    def get_stream_conversation_id(self, stream_id):
        """添加ai请求的状态"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id"
        return self.redis.get(key)
 
    def exists_stream(self, stream_id):
        """检查键是否存在"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock"
        return self.redis.exists(key)
 
    def exists_stream_chunks(self, stream_id):
        """检查键是否存在"""
        key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
        return self.redis.exists(key)
 
    def delete_stream_chunks(self, stream_id):
        """删除stream_id"""
        pattern = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:*"
        count = 0
        for key in self.redis.scan_iter(match=pattern):
            self.redis.delete(key)
            count += 1
 
    def add_wecom_msgid(self, msgid):
        """添加企业微信回调msgid"""
        key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
        self.redis.set(key, 0)
        self.redis.expire(key, self.config.MSGID_EXPIRE)
 
    def update_wecom_msgid(self, msgid, ai_answer):
        """更新企业微信回调msgid"""
        key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
        self.redis.set(key, ai_answer)
        self.redis.expire(key, self.config.MSGID_EXPIRE)
 
    def delete_wecom_msgid(self, msgid):
        """删除企业微信回调msgid"""
        key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
        self.redis.delete(key)
 
    def get_wecom_msgid(self, msgid):
        """获取msgid"""
        key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
        return self.redis.get(key)
 
    def exists_wecom_msgid(self, msgid):
        """msgid是否存在"""
        key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
        return self.redis.exists(key)