yj
2025-07-22 2a8d022a20f82c35e9df680dfb66c3c95863b190
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
后台消息处理工作进程
"""
 
import time
import threading
from typing import Set
from loguru import logger
from config import settings
from app.services.redis_queue import redis_queue
from app.services.message_processor import message_processor
 
 
class MessageWorker:
    """消息处理工作进程"""
 
    def __init__(self):
        self.running = False
        self.worker_threads: Set[threading.Thread] = set()
        self.active_users: Set[str] = set()
        self.lock = threading.Lock()
 
    def start(self):
        """启动工作进程"""
        self.running = True
        logger.info("消息处理工作进程已启动")
 
        # 启动主监控线程
        monitor_thread = threading.Thread(target=self._monitor_queues, daemon=True)
        monitor_thread.start()
 
        logger.info("队列监控线程已启动")
 
    def stop(self):
        """停止工作进程"""
        self.running = False
        logger.info("正在停止消息处理工作进程...")
 
        # 等待所有工作线程完成
        for thread in self.worker_threads:
            if thread.is_alive():
                thread.join(timeout=10)
 
        logger.info("消息处理工作进程已停止")
 
    def _monitor_queues(self):
        """监控队列,为有消息的用户启动处理线程"""
        while self.running:
            try:
                # 这里可以实现更复杂的队列发现机制
                # 目前简化为检查已知的活跃用户
                # 在实际应用中,可以通过Redis的SCAN命令扫描所有队列
 
                # 清理已完成的线程
                self._cleanup_finished_threads()
 
                # 休眠一段时间再检查
                time.sleep(5)
 
            except Exception as e:
                logger.error(f"队列监控异常: {str(e)}")
                time.sleep(10)
 
    def _cleanup_finished_threads(self):
        """清理已完成的线程"""
        with self.lock:
            finished_threads = {t for t in self.worker_threads if not t.is_alive()}
            self.worker_threads -= finished_threads
 
    def process_user_queue(self, from_user: str):
        """
        处理指定用户的消息队列
 
        Args:
            from_user: 用户ID
        """
        # 检查是否已有线程在处理该用户的消息
        with self.lock:
            if from_user in self.active_users:
                logger.info(f"用户队列已在处理中: {from_user}")
                return
 
            self.active_users.add(from_user)
 
        # 启动用户专属处理线程
        worker_thread = threading.Thread(
            target=self._process_user_messages, args=(from_user,), daemon=True
        )
        worker_thread.start()
 
        with self.lock:
            self.worker_threads.add(worker_thread)
 
        logger.info(f"为用户启动消息处理线程: {from_user}")
 
    def _process_user_messages(self, from_user: str):
        """
        处理用户消息的工作线程
 
        Args:
            from_user: 用户ID
        """
        try:
            logger.info(f"开始处理用户消息队列: {from_user}")
 
            while self.running:
                # 检查用户是否正在处理中(防止并发)
                if redis_queue.is_processing(from_user):
                    logger.info(f"用户正在处理中,等待: {from_user}")
                    time.sleep(5)
                    continue
 
                # 设置处理状态
                if not redis_queue.set_processing_status(
                    from_user, True, settings.queue_timeout
                ):
                    logger.error(f"设置处理状态失败: {from_user}")
                    break
 
                try:
                    # 从队列中取出消息
                    message_data = redis_queue.dequeue_message(from_user, timeout=10)
 
                    if not message_data:
                        # 队列为空,清除处理状态并退出
                        redis_queue.set_processing_status(from_user, False)
                        logger.info(f"用户队列为空,结束处理: {from_user}")
                        break
 
                    # 处理消息
                    success = message_processor.process_single_message(message_data)
 
                    if success:
                        logger.info(f"消息处理成功: {from_user}")
                    else:
                        logger.error(f"消息处理失败: {from_user}")
                        # 可以考虑将失败的消息重新入队或记录到错误队列
 
                except Exception as e:
                    logger.error(f"处理用户消息异常: {from_user}, error={str(e)}")
 
                finally:
                    # 清除处理状态
                    redis_queue.set_processing_status(from_user, False)
 
                # 短暂休眠,避免过于频繁的处理
                time.sleep(1)
 
        except Exception as e:
            logger.error(f"用户消息处理线程异常: {from_user}, error={str(e)}")
 
        finally:
            # 清理活跃用户记录
            with self.lock:
                self.active_users.discard(from_user)
 
            logger.info(f"用户消息处理线程结束: {from_user}")
 
 
# 全局消息工作进程实例
message_worker = MessageWorker()