import redis
|
import json
|
import time
|
from config_manager import global_config
|
|
|
class RedisManager:
|
def __init__(self):
|
self.config = global_config.get_config()
|
self.redis = redis.StrictRedis(
|
host=self.config.REDIS_HOST,
|
port=self.config.REDIS_PORT,
|
db=self.config.REDIS_DB,
|
password=self.config.REDIS_PASSWORD,
|
decode_responses=True,
|
max_connections=100,
|
socket_timeout=5
|
)
|
|
def cache_group_data(self, group_id, data_type, data):
|
"""缓存群组相关数据"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}"
|
if isinstance(data, list):
|
self.redis.delete(key)
|
if data:
|
self.redis.sadd(key, *data)
|
else:
|
self.redis.set(key, json.dumps(data))
|
|
def get_group_data(self, group_id, data_type):
|
"""获取群组相关数据"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}"
|
data_type = data_type.lower()
|
|
if data_type in ['users', 'sms_accounts']:
|
return self.redis.smembers(key) or []
|
else:
|
data = self.redis.get(key)
|
return json.loads(data) if data else None
|
|
def add_user_message(self, user_id, role, content):
|
"""添加用户消息到Redis"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages"
|
message = {
|
"role": role,
|
"content": content,
|
"timestamp": time.time()
|
}
|
self.redis.rpush(key, json.dumps(message))
|
self.redis.expire(key, self.config.MESSAGE_EXPIRE)
|
|
def add_conversation_id(self, user_id, conversation_id):
|
"""添加会话id"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id"
|
self.redis.set(key, conversation_id)
|
self.redis.expire(key, self.config.MESSAGE_EXPIRE)
|
|
def get_user_messages(self, user_id):
|
"""获取用户的所有消息"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages"
|
messages = self.redis.lrange(key, 0, -1)
|
return [json.loads(msg) for msg in messages] if messages else []
|
|
def get_user_conversation_id(self, user_id):
|
"""获取用户的conversation_id"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id"
|
return self.redis.get(key)
|
|
def clear_user_messages(self, user_id):
|
"""清除用户的消息缓存"""
|
key_prefix = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}"
|
key1 = f"{key_prefix}:messages"
|
key2 = f"{key_prefix}:conversation_id"
|
self.redis.delete(key1)
|
self.redis.delete(key2)
|
|
def add_stream(self, stream_id):
|
"""添加流列表"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}"
|
self.redis.set(key, 0)
|
self.redis.expire(key, 180)
|
|
def add_stream_chunks(self, stream_id, chunks):
|
"""添加流列表"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
|
self.redis.lpush(key, *chunks)
|
self.redis.expire(key, 180)
|
|
def get_stream_chunk(self, stream_id):
|
"""获取流块"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
|
return self.redis.rpop(key)
|
|
def add_stream_status(self, stream_id, status):
|
"""添加ai请求的状态"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status"
|
self.redis.set(key, status)
|
self.redis.expire(key, 180)
|
|
def get_stream_status(self, stream_id):
|
"""添加ai请求的状态"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status"
|
return self.redis.get(key)
|
|
def add_stream_lock(self, stream_id, lock):
|
"""添加lock"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock"
|
self.redis.set(key, lock)
|
self.redis.expire(key, 180)
|
|
def add_stream_conversation_id(self, stream_id, conversation_id):
|
"""添加ai请求的状态"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id"
|
self.redis.set(key, conversation_id)
|
self.redis.expire(key, 180)
|
|
def get_stream_conversation_id(self, stream_id):
|
"""添加ai请求的状态"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id"
|
return self.redis.get(key)
|
|
def exists_stream(self, stream_id):
|
"""检查键是否存在"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock"
|
return self.redis.exists(key)
|
|
def exists_stream_chunks(self, stream_id):
|
"""检查键是否存在"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks"
|
return self.redis.exists(key)
|
|
def delete_stream_chunks(self, stream_id):
|
"""删除stream_id"""
|
pattern = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:*"
|
count = 0
|
for key in self.redis.scan_iter(match=pattern):
|
self.redis.delete(key)
|
count += 1
|
|
def add_wecom_msgid(self, msgid):
|
"""添加企业微信回调msgid"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
|
self.redis.set(key, 0)
|
self.redis.expire(key, self.config.MSGID_EXPIRE)
|
|
def update_wecom_msgid(self, msgid, ai_answer):
|
"""更新企业微信回调msgid"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
|
self.redis.set(key, ai_answer)
|
self.redis.expire(key, self.config.MSGID_EXPIRE)
|
|
def delete_wecom_msgid(self, msgid):
|
"""删除企业微信回调msgid"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
|
self.redis.delete(key)
|
|
def get_wecom_msgid(self, msgid):
|
"""获取msgid"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
|
return self.redis.get(key)
|
|
def exists_wecom_msgid(self, msgid):
|
"""msgid是否存在"""
|
key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}"
|
return self.redis.exists(key)
|