"""
|
后台消息处理工作进程
|
"""
|
|
import time
|
import threading
|
from typing import Set
|
from loguru import logger
|
from config import settings
|
from app.services.redis_queue import redis_queue
|
from app.services.message_processor import message_processor
|
|
|
class MessageWorker:
|
"""消息处理工作进程"""
|
|
def __init__(self):
|
self.running = False
|
self.worker_threads: Set[threading.Thread] = set()
|
self.active_users: Set[str] = set()
|
self.lock = threading.Lock()
|
|
def start(self):
|
"""启动工作进程"""
|
self.running = True
|
logger.info("消息处理工作进程已启动")
|
|
# 启动主监控线程
|
monitor_thread = threading.Thread(target=self._monitor_queues, daemon=True)
|
monitor_thread.start()
|
|
logger.info("队列监控线程已启动")
|
|
def stop(self):
|
"""停止工作进程"""
|
self.running = False
|
logger.info("正在停止消息处理工作进程...")
|
|
# 等待所有工作线程完成
|
for thread in self.worker_threads:
|
if thread.is_alive():
|
thread.join(timeout=10)
|
|
logger.info("消息处理工作进程已停止")
|
|
def _monitor_queues(self):
|
"""监控队列,为有消息的用户启动处理线程"""
|
while self.running:
|
try:
|
# 这里可以实现更复杂的队列发现机制
|
# 目前简化为检查已知的活跃用户
|
# 在实际应用中,可以通过Redis的SCAN命令扫描所有队列
|
|
# 清理已完成的线程
|
self._cleanup_finished_threads()
|
|
# 休眠一段时间再检查
|
time.sleep(5)
|
|
except Exception as e:
|
logger.error(f"队列监控异常: {str(e)}")
|
time.sleep(10)
|
|
def _cleanup_finished_threads(self):
|
"""清理已完成的线程"""
|
with self.lock:
|
finished_threads = {t for t in self.worker_threads if not t.is_alive()}
|
self.worker_threads -= finished_threads
|
|
def process_user_queue(self, from_user: str):
|
"""
|
处理指定用户的消息队列
|
|
Args:
|
from_user: 用户ID
|
"""
|
# 检查是否已有线程在处理该用户的消息
|
with self.lock:
|
if from_user in self.active_users:
|
logger.info(f"用户队列已在处理中: {from_user}")
|
return
|
|
self.active_users.add(from_user)
|
|
# 启动用户专属处理线程
|
worker_thread = threading.Thread(
|
target=self._process_user_messages, args=(from_user,), daemon=True
|
)
|
worker_thread.start()
|
|
with self.lock:
|
self.worker_threads.add(worker_thread)
|
|
logger.info(f"为用户启动消息处理线程: {from_user}")
|
|
def _process_user_messages(self, from_user: str):
|
"""
|
处理用户消息的工作线程
|
|
Args:
|
from_user: 用户ID
|
"""
|
try:
|
logger.info(f"开始处理用户消息队列: {from_user}")
|
|
while self.running:
|
# 检查用户是否正在处理中(防止并发)
|
if redis_queue.is_processing(from_user):
|
logger.info(f"用户正在处理中,等待: {from_user}")
|
time.sleep(5)
|
continue
|
|
# 设置处理状态
|
if not redis_queue.set_processing_status(
|
from_user, True, settings.queue_timeout
|
):
|
logger.error(f"设置处理状态失败: {from_user}")
|
break
|
|
try:
|
# 从队列中取出消息
|
message_data = redis_queue.dequeue_message(from_user, timeout=10)
|
|
if not message_data:
|
# 队列为空,清除处理状态并退出
|
redis_queue.set_processing_status(from_user, False)
|
logger.info(f"用户队列为空,结束处理: {from_user}")
|
break
|
|
# 处理消息
|
success = message_processor.process_single_message(message_data)
|
|
if success:
|
logger.info(f"消息处理成功: {from_user}")
|
else:
|
logger.error(f"消息处理失败: {from_user}")
|
# 可以考虑将失败的消息重新入队或记录到错误队列
|
|
except Exception as e:
|
logger.error(f"处理用户消息异常: {from_user}, error={str(e)}")
|
|
finally:
|
# 清除处理状态
|
redis_queue.set_processing_status(from_user, False)
|
|
# 短暂休眠,避免过于频繁的处理
|
time.sleep(1)
|
|
except Exception as e:
|
logger.error(f"用户消息处理线程异常: {from_user}, error={str(e)}")
|
|
finally:
|
# 清理活跃用户记录
|
with self.lock:
|
self.active_users.discard(from_user)
|
|
logger.info(f"用户消息处理线程结束: {from_user}")
|
|
|
# 全局消息工作进程实例
|
message_worker = MessageWorker()
|