import redis import json import time from config_manager import global_config class RedisManager: def __init__(self): self.config = global_config.get_config() self.redis = redis.StrictRedis( host=self.config.REDIS_HOST, port=self.config.REDIS_PORT, db=self.config.REDIS_DB, password=self.config.REDIS_PASSWORD, decode_responses=True, max_connections=100, socket_timeout=5 ) def cache_group_data(self, group_id, data_type, data): """缓存群组相关数据""" key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}" if isinstance(data, list): self.redis.delete(key) if data: self.redis.sadd(key, *data) else: self.redis.set(key, json.dumps(data)) def get_group_data(self, group_id, data_type): """获取群组相关数据""" key = f"{self.config.REDIS_KEY_PREFIX}:group:{group_id}:{data_type}" data_type = data_type.lower() if data_type in ['users', 'sms_accounts']: return self.redis.smembers(key) or [] else: data = self.redis.get(key) return json.loads(data) if data else None def add_user_message(self, user_id, role, content): """添加用户消息到Redis""" key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages" message = { "role": role, "content": content, "timestamp": time.time() } self.redis.rpush(key, json.dumps(message)) self.redis.expire(key, self.config.MESSAGE_EXPIRE) def add_conversation_id(self, user_id, conversation_id): """添加会话id""" key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id" self.redis.set(key, conversation_id) self.redis.expire(key, self.config.MESSAGE_EXPIRE) def get_user_messages(self, user_id): """获取用户的所有消息""" key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:messages" messages = self.redis.lrange(key, 0, -1) return [json.loads(msg) for msg in messages] if messages else [] def get_user_conversation_id(self, user_id): """获取用户的conversation_id""" key = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}:conversation_id" return self.redis.get(key) def clear_user_messages(self, user_id): """清除用户的消息缓存""" key_prefix = f"{self.config.REDIS_KEY_PREFIX}:user_messages:{user_id}" key1 = f"{key_prefix}:messages" key2 = f"{key_prefix}:conversation_id" self.redis.delete(key1) self.redis.delete(key2) def add_stream(self, stream_id): """添加流列表""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}" self.redis.set(key, 0) self.redis.expire(key, 180) def add_stream_chunks(self, stream_id, chunks): """添加流列表""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks" self.redis.lpush(key, *chunks) self.redis.expire(key, 180) def get_stream_chunk(self, stream_id): """获取流块""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks" return self.redis.rpop(key) def add_stream_status(self, stream_id, status): """添加ai请求的状态""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status" self.redis.set(key, status) self.redis.expire(key, 180) def get_stream_status(self, stream_id): """添加ai请求的状态""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:status" return self.redis.get(key) def add_stream_lock(self, stream_id, lock): """添加lock""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock" self.redis.set(key, lock) self.redis.expire(key, 180) def add_stream_conversation_id(self, stream_id, conversation_id): """添加ai请求的状态""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id" self.redis.set(key, conversation_id) self.redis.expire(key, 180) def get_stream_conversation_id(self, stream_id): """添加ai请求的状态""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:conversation_id" return self.redis.get(key) def exists_stream(self, stream_id): """检查键是否存在""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:lock" return self.redis.exists(key) def exists_stream_chunks(self, stream_id): """检查键是否存在""" key = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:chunks" return self.redis.exists(key) def delete_stream_chunks(self, stream_id): """删除stream_id""" pattern = f"{self.config.REDIS_KEY_PREFIX}:stream:{stream_id}:*" count = 0 for key in self.redis.scan_iter(match=pattern): self.redis.delete(key) count += 1 def add_wecom_msgid(self, msgid): """添加企业微信回调msgid""" key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}" self.redis.set(key, 0) self.redis.expire(key, self.config.MSGID_EXPIRE) def update_wecom_msgid(self, msgid, ai_answer): """更新企业微信回调msgid""" key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}" self.redis.set(key, ai_answer) self.redis.expire(key, self.config.MSGID_EXPIRE) def delete_wecom_msgid(self, msgid): """删除企业微信回调msgid""" key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}" self.redis.delete(key) def get_wecom_msgid(self, msgid): """获取msgid""" key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}" return self.redis.get(key) def exists_wecom_msgid(self, msgid): """msgid是否存在""" key = f"{self.config.REDIS_KEY_PREFIX}:msgid:{msgid}" return self.redis.exists(key)