"""
|
后台消息处理工作进程
|
"""
|
|
import time
|
import threading
|
from typing import Set
|
from loguru import logger
|
from config import settings
|
from app.services.redis_queue import redis_queue
|
from app.services.message_processor import message_processor
|
|
|
class MessageWorker:
|
"""消息处理工作进程"""
|
|
def __init__(self):
|
self.running = False
|
self.worker_threads: Set[threading.Thread] = set()
|
self.active_users: Set[str] = set()
|
self.lock = threading.Lock()
|
|
def start(self):
|
"""启动工作进程"""
|
self.running = True
|
logger.info("消息处理工作进程已启动")
|
|
# 启动主监控线程
|
monitor_thread = threading.Thread(target=self._monitor_queues, daemon=True)
|
monitor_thread.start()
|
|
logger.info("队列监控线程已启动")
|
|
def stop(self):
|
"""停止工作进程"""
|
self.running = False
|
logger.info("正在停止消息处理工作进程...")
|
|
# 等待所有工作线程完成
|
for thread in self.worker_threads:
|
if thread.is_alive():
|
thread.join(timeout=10)
|
|
logger.info("消息处理工作进程已停止")
|
|
def _monitor_queues(self):
|
"""监控队列,为有消息的用户启动处理线程"""
|
while self.running:
|
try:
|
# 使用Redis的SCAN命令扫描所有队列键
|
cursor = 0
|
queue_keys = set()
|
|
while self.running:
|
cursor, keys = self._scan_queue_keys(cursor)
|
queue_keys.update(keys)
|
|
# 如果游标为0,表示扫描完成
|
if cursor == 0:
|
break
|
|
# 为每个有消息的队列启动处理线程
|
for queue_key in queue_keys:
|
if not self.running:
|
break
|
|
# 从队列键中提取用户ID
|
if queue_key.startswith(redis_queue.queue_prefix):
|
from_user = queue_key[len(redis_queue.queue_prefix):]
|
|
# 检查队列是否有消息
|
queue_length = redis_queue.get_queue_length(from_user)
|
if queue_length > 0:
|
logger.info(f"发现用户队列有消息: user={from_user}, length={queue_length}")
|
# 启动用户队列处理
|
self.process_user_queue(from_user)
|
|
# 清理已完成的线程
|
self._cleanup_finished_threads()
|
|
# 休眠一段时间再检查
|
time.sleep(5)
|
|
except Exception as e:
|
logger.error(f"队列监控异常: {str(e)}")
|
time.sleep(10)
|
|
def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple:
|
"""
|
扫描队列键
|
|
Args:
|
cursor: 扫描游标
|
count: 每次扫描的键数量
|
|
Returns:
|
(新游标, 队列键列表)
|
"""
|
try:
|
# 使用SCAN命令扫描匹配模式的键
|
new_cursor, keys = redis_queue.redis_client.scan(
|
cursor=cursor,
|
match=f"{redis_queue.queue_prefix}*",
|
count=count
|
)
|
return new_cursor, keys
|
except Exception as e:
|
logger.error(f"扫描队列键失败: {str(e)}")
|
return 0, []
|
|
def _cleanup_finished_threads(self):
|
"""清理已完成的线程"""
|
with self.lock:
|
finished_threads = {t for t in self.worker_threads if not t.is_alive()}
|
self.worker_threads -= finished_threads
|
|
def process_user_queue(self, from_user: str):
|
"""
|
处理指定用户的消息队列
|
|
Args:
|
from_user: 用户ID
|
"""
|
# 检查是否已有线程在处理该用户的消息
|
with self.lock:
|
if from_user in self.active_users:
|
logger.info(f"用户队列已在处理中: {from_user}")
|
return
|
|
self.active_users.add(from_user)
|
|
# 启动用户专属处理线程
|
worker_thread = threading.Thread(
|
target=self._process_user_messages, args=(from_user,), daemon=True
|
)
|
worker_thread.start()
|
|
with self.lock:
|
self.worker_threads.add(worker_thread)
|
|
logger.info(f"为用户启动消息处理线程: {from_user}")
|
|
def _process_user_messages(self, from_user: str):
|
"""
|
处理用户消息的工作线程
|
|
Args:
|
from_user: 用户ID
|
"""
|
try:
|
logger.info(f"开始处理用户消息队列: {from_user}")
|
|
# 获取队列长度
|
queue_length = redis_queue.get_queue_length(from_user)
|
logger.info(f"用户队列初始长度: {from_user}, length={queue_length}")
|
|
while self.running:
|
# 检查用户是否正在处理中(防止并发)
|
if redis_queue.is_processing(from_user):
|
logger.info(f"用户正在处理中,等待: {from_user}")
|
time.sleep(5)
|
continue
|
|
# 设置处理状态
|
if not redis_queue.set_processing_status(
|
from_user, True, settings.queue_timeout
|
):
|
logger.error(f"设置处理状态失败: {from_user}")
|
break
|
|
try:
|
# 从队列中取出消息
|
message_data = redis_queue.dequeue_message(from_user, timeout=10)
|
|
if not message_data:
|
# 队列为空,清除处理状态并退出
|
redis_queue.set_processing_status(from_user, False)
|
logger.info(f"用户队列为空,结束处理: {from_user}")
|
break
|
|
# 处理消息
|
success = message_processor.process_single_message(message_data)
|
|
if success:
|
logger.info(f"消息处理成功: {from_user}")
|
# 更新队列长度信息
|
queue_length = redis_queue.get_queue_length(from_user)
|
logger.info(f"用户队列剩余长度: {from_user}, length={queue_length}")
|
else:
|
logger.error(f"消息处理失败: {from_user}")
|
# 可以考虑将失败的消息重新入队或记录到错误队列
|
|
except Exception as e:
|
logger.error(f"处理用户消息异常: {from_user}, error={str(e)}")
|
|
finally:
|
# 清除处理状态
|
redis_queue.set_processing_status(from_user, False)
|
|
# 短暂休眠,避免过于频繁的处理
|
time.sleep(1)
|
|
except Exception as e:
|
logger.error(f"用户消息处理线程异常: {from_user}, error={str(e)}")
|
|
finally:
|
# 清理活跃用户记录
|
with self.lock:
|
self.active_users.discard(from_user)
|
|
logger.info(f"用户消息处理线程结束: {from_user}")
|
|
|
# 全局消息工作进程实例
|
message_worker = MessageWorker()
|