| | |
| | | """监控队列,为有消息的用户启动处理线程""" |
| | | while self.running: |
| | | try: |
| | | # 这里可以实现更复杂的队列发现机制 |
| | | # 目前简化为检查已知的活跃用户 |
| | | # 在实际应用中,可以通过Redis的SCAN命令扫描所有队列 |
| | | # 使用Redis的SCAN命令扫描所有队列键 |
| | | cursor = 0 |
| | | queue_keys = set() |
| | | |
| | | while self.running: |
| | | cursor, keys = self._scan_queue_keys(cursor) |
| | | queue_keys.update(keys) |
| | | |
| | | # 如果游标为0,表示扫描完成 |
| | | if cursor == 0: |
| | | break |
| | | |
| | | # 为每个有消息的队列启动处理线程 |
| | | for queue_key in queue_keys: |
| | | if not self.running: |
| | | break |
| | | |
| | | # 从队列键中提取用户ID |
| | | if queue_key.startswith(redis_queue.queue_prefix): |
| | | from_user = queue_key[len(redis_queue.queue_prefix):] |
| | | |
| | | # 检查队列是否有消息 |
| | | queue_length = redis_queue.get_queue_length(from_user) |
| | | if queue_length > 0: |
| | | logger.info(f"发现用户队列有消息: user={from_user}, length={queue_length}") |
| | | # 启动用户队列处理 |
| | | self.process_user_queue(from_user) |
| | | |
| | | # 清理已完成的线程 |
| | | self._cleanup_finished_threads() |
| | |
| | | except Exception as e: |
| | | logger.error(f"队列监控异常: {str(e)}") |
| | | time.sleep(10) |
| | | |
| | | def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple: |
| | | """ |
| | | 扫描队列键 |
| | | |
| | | Args: |
| | | cursor: 扫描游标 |
| | | count: 每次扫描的键数量 |
| | | |
| | | Returns: |
| | | (新游标, 队列键列表) |
| | | """ |
| | | try: |
| | | # 使用SCAN命令扫描匹配模式的键 |
| | | new_cursor, keys = redis_queue.redis_client.scan( |
| | | cursor=cursor, |
| | | match=f"{redis_queue.queue_prefix}*", |
| | | count=count |
| | | ) |
| | | return new_cursor, keys |
| | | except Exception as e: |
| | | logger.error(f"扫描队列键失败: {str(e)}") |
| | | return 0, [] |
| | | |
| | | def _cleanup_finished_threads(self): |
| | | """清理已完成的线程""" |
| | |
| | | try: |
| | | logger.info(f"开始处理用户消息队列: {from_user}") |
| | | |
| | | # 获取队列长度 |
| | | queue_length = redis_queue.get_queue_length(from_user) |
| | | logger.info(f"用户队列初始长度: {from_user}, length={queue_length}") |
| | | |
| | | while self.running: |
| | | # 检查用户是否正在处理中(防止并发) |
| | | if redis_queue.is_processing(from_user): |
| | |
| | | |
| | | if success: |
| | | logger.info(f"消息处理成功: {from_user}") |
| | | # 更新队列长度信息 |
| | | queue_length = redis_queue.get_queue_length(from_user) |
| | | logger.info(f"用户队列剩余长度: {from_user}, length={queue_length}") |
| | | else: |
| | | logger.error(f"消息处理失败: {from_user}") |
| | | # 可以考虑将失败的消息重新入队或记录到错误队列 |