yj
2025-07-23 1225b6cbf0a028b765a0ab6d784bcb80459a67bb
app/workers/message_worker.py
@@ -47,10 +47,34 @@
        """监控队列,为有消息的用户启动处理线程"""
        while self.running:
            try:
                # 这里可以实现更复杂的队列发现机制
                # 目前简化为检查已知的活跃用户
                # 在实际应用中,可以通过Redis的SCAN命令扫描所有队列
                # 使用Redis的SCAN命令扫描所有队列键
                cursor = 0
                queue_keys = set()
                while self.running:
                    cursor, keys = self._scan_queue_keys(cursor)
                    queue_keys.update(keys)
                    # 如果游标为0,表示扫描完成
                    if cursor == 0:
                        break
                # 为每个有消息的队列启动处理线程
                for queue_key in queue_keys:
                    if not self.running:
                        break
                    # 从队列键中提取用户ID
                    if queue_key.startswith(redis_queue.queue_prefix):
                        from_user = queue_key[len(redis_queue.queue_prefix):]
                        # 检查队列是否有消息
                        queue_length = redis_queue.get_queue_length(from_user)
                        if queue_length > 0:
                            logger.info(f"发现用户队列有消息: user={from_user}, length={queue_length}")
                            # 启动用户队列处理
                            self.process_user_queue(from_user)
                # 清理已完成的线程
                self._cleanup_finished_threads()
@@ -60,6 +84,29 @@
            except Exception as e:
                logger.error(f"队列监控异常: {str(e)}")
                time.sleep(10)
    def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple:
        """
        扫描队列键
        Args:
            cursor: 扫描游标
            count: 每次扫描的键数量
        Returns:
            (新游标, 队列键列表)
        """
        try:
            # 使用SCAN命令扫描匹配模式的键
            new_cursor, keys = redis_queue.redis_client.scan(
                cursor=cursor,
                match=f"{redis_queue.queue_prefix}*",
                count=count
            )
            return new_cursor, keys
        except Exception as e:
            logger.error(f"扫描队列键失败: {str(e)}")
            return 0, []
    def _cleanup_finished_threads(self):
        """清理已完成的线程"""
@@ -102,6 +149,10 @@
        """
        try:
            logger.info(f"开始处理用户消息队列: {from_user}")
            # 获取队列长度
            queue_length = redis_queue.get_queue_length(from_user)
            logger.info(f"用户队列初始长度: {from_user}, length={queue_length}")
            while self.running:
                # 检查用户是否正在处理中(防止并发)
@@ -132,6 +183,9 @@
                    if success:
                        logger.info(f"消息处理成功: {from_user}")
                        # 更新队列长度信息
                        queue_length = redis_queue.get_queue_length(from_user)
                        logger.info(f"用户队列剩余长度: {from_user}, length={queue_length}")
                    else:
                        logger.error(f"消息处理失败: {from_user}")
                        # 可以考虑将失败的消息重新入队或记录到错误队列