""" 后台消息处理工作进程 """ import time import threading from typing import Set from loguru import logger from config import settings from app.services.redis_queue import redis_queue from app.services.message_processor import message_processor class MessageWorker: """消息处理工作进程""" def __init__(self): self.running = False self.worker_threads: Set[threading.Thread] = set() self.active_users: Set[str] = set() self.lock = threading.Lock() def start(self): """启动工作进程""" self.running = True logger.info("消息处理工作进程已启动") # 启动主监控线程 monitor_thread = threading.Thread(target=self._monitor_queues, daemon=True) monitor_thread.start() logger.info("队列监控线程已启动") def stop(self): """停止工作进程""" self.running = False logger.info("正在停止消息处理工作进程...") # 等待所有工作线程完成 for thread in self.worker_threads: if thread.is_alive(): thread.join(timeout=10) logger.info("消息处理工作进程已停止") def _monitor_queues(self): """监控队列,为有消息的用户启动处理线程""" while self.running: try: # 这里可以实现更复杂的队列发现机制 # 目前简化为检查已知的活跃用户 # 在实际应用中,可以通过Redis的SCAN命令扫描所有队列 # 清理已完成的线程 self._cleanup_finished_threads() # 休眠一段时间再检查 time.sleep(5) except Exception as e: logger.error(f"队列监控异常: {str(e)}") time.sleep(10) def _cleanup_finished_threads(self): """清理已完成的线程""" with self.lock: finished_threads = {t for t in self.worker_threads if not t.is_alive()} self.worker_threads -= finished_threads def process_user_queue(self, from_user: str): """ 处理指定用户的消息队列 Args: from_user: 用户ID """ # 检查是否已有线程在处理该用户的消息 with self.lock: if from_user in self.active_users: logger.info(f"用户队列已在处理中: {from_user}") return self.active_users.add(from_user) # 启动用户专属处理线程 worker_thread = threading.Thread( target=self._process_user_messages, args=(from_user,), daemon=True ) worker_thread.start() with self.lock: self.worker_threads.add(worker_thread) logger.info(f"为用户启动消息处理线程: {from_user}") def _process_user_messages(self, from_user: str): """ 处理用户消息的工作线程 Args: from_user: 用户ID """ try: logger.info(f"开始处理用户消息队列: {from_user}") while self.running: # 检查用户是否正在处理中(防止并发) if redis_queue.is_processing(from_user): logger.info(f"用户正在处理中,等待: {from_user}") time.sleep(5) continue # 设置处理状态 if not redis_queue.set_processing_status( from_user, True, settings.queue_timeout ): logger.error(f"设置处理状态失败: {from_user}") break try: # 从队列中取出消息 message_data = redis_queue.dequeue_message(from_user, timeout=10) if not message_data: # 队列为空,清除处理状态并退出 redis_queue.set_processing_status(from_user, False) logger.info(f"用户队列为空,结束处理: {from_user}") break # 处理消息 success = message_processor.process_single_message(message_data) if success: logger.info(f"消息处理成功: {from_user}") else: logger.error(f"消息处理失败: {from_user}") # 可以考虑将失败的消息重新入队或记录到错误队列 except Exception as e: logger.error(f"处理用户消息异常: {from_user}, error={str(e)}") finally: # 清除处理状态 redis_queue.set_processing_status(from_user, False) # 短暂休眠,避免过于频繁的处理 time.sleep(1) except Exception as e: logger.error(f"用户消息处理线程异常: {from_user}, error={str(e)}") finally: # 清理活跃用户记录 with self.lock: self.active_users.discard(from_user) logger.info(f"用户消息处理线程结束: {from_user}") # 全局消息工作进程实例 message_worker = MessageWorker()