yj
2025-07-23 1225b6cbf0a028b765a0ab6d784bcb80459a67bb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
后台消息处理工作进程
"""
 
import time
import threading
from typing import Set
from loguru import logger
from config import settings
from app.services.redis_queue import redis_queue
from app.services.message_processor import message_processor
 
 
class MessageWorker:
    """消息处理工作进程"""
 
    def __init__(self):
        self.running = False
        self.worker_threads: Set[threading.Thread] = set()
        self.active_users: Set[str] = set()
        self.lock = threading.Lock()
 
    def start(self):
        """启动工作进程"""
        self.running = True
        logger.info("消息处理工作进程已启动")
 
        # 启动主监控线程
        monitor_thread = threading.Thread(target=self._monitor_queues, daemon=True)
        monitor_thread.start()
 
        logger.info("队列监控线程已启动")
 
    def stop(self):
        """停止工作进程"""
        self.running = False
        logger.info("正在停止消息处理工作进程...")
 
        # 等待所有工作线程完成
        for thread in self.worker_threads:
            if thread.is_alive():
                thread.join(timeout=10)
 
        logger.info("消息处理工作进程已停止")
 
    def _monitor_queues(self):
        """监控队列,为有消息的用户启动处理线程"""
        while self.running:
            try:
                # 使用Redis的SCAN命令扫描所有队列键
                cursor = 0
                queue_keys = set()
                
                while self.running:
                    cursor, keys = self._scan_queue_keys(cursor)
                    queue_keys.update(keys)
                    
                    # 如果游标为0,表示扫描完成
                    if cursor == 0:
                        break
                
                # 为每个有消息的队列启动处理线程
                for queue_key in queue_keys:
                    if not self.running:
                        break
                    
                    # 从队列键中提取用户ID
                    if queue_key.startswith(redis_queue.queue_prefix):
                        from_user = queue_key[len(redis_queue.queue_prefix):]
                        
                        # 检查队列是否有消息
                        queue_length = redis_queue.get_queue_length(from_user)
                        if queue_length > 0:
                            logger.info(f"发现用户队列有消息: user={from_user}, length={queue_length}")
                            # 启动用户队列处理
                            self.process_user_queue(from_user)
                
                # 清理已完成的线程
                self._cleanup_finished_threads()
 
                # 休眠一段时间再检查
                time.sleep(5)
 
            except Exception as e:
                logger.error(f"队列监控异常: {str(e)}")
                time.sleep(10)
    
    def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple:
        """
        扫描队列键
        
        Args:
            cursor: 扫描游标
            count: 每次扫描的键数量
            
        Returns:
            (新游标, 队列键列表)
        """
        try:
            # 使用SCAN命令扫描匹配模式的键
            new_cursor, keys = redis_queue.redis_client.scan(
                cursor=cursor,
                match=f"{redis_queue.queue_prefix}*",
                count=count
            )
            return new_cursor, keys
        except Exception as e:
            logger.error(f"扫描队列键失败: {str(e)}")
            return 0, []
 
    def _cleanup_finished_threads(self):
        """清理已完成的线程"""
        with self.lock:
            finished_threads = {t for t in self.worker_threads if not t.is_alive()}
            self.worker_threads -= finished_threads
 
    def process_user_queue(self, from_user: str):
        """
        处理指定用户的消息队列
 
        Args:
            from_user: 用户ID
        """
        # 检查是否已有线程在处理该用户的消息
        with self.lock:
            if from_user in self.active_users:
                logger.info(f"用户队列已在处理中: {from_user}")
                return
 
            self.active_users.add(from_user)
 
        # 启动用户专属处理线程
        worker_thread = threading.Thread(
            target=self._process_user_messages, args=(from_user,), daemon=True
        )
        worker_thread.start()
 
        with self.lock:
            self.worker_threads.add(worker_thread)
 
        logger.info(f"为用户启动消息处理线程: {from_user}")
 
    def _process_user_messages(self, from_user: str):
        """
        处理用户消息的工作线程
 
        Args:
            from_user: 用户ID
        """
        try:
            logger.info(f"开始处理用户消息队列: {from_user}")
            
            # 获取队列长度
            queue_length = redis_queue.get_queue_length(from_user)
            logger.info(f"用户队列初始长度: {from_user}, length={queue_length}")
 
            while self.running:
                # 检查用户是否正在处理中(防止并发)
                if redis_queue.is_processing(from_user):
                    logger.info(f"用户正在处理中,等待: {from_user}")
                    time.sleep(5)
                    continue
 
                # 设置处理状态
                if not redis_queue.set_processing_status(
                    from_user, True, settings.queue_timeout
                ):
                    logger.error(f"设置处理状态失败: {from_user}")
                    break
 
                try:
                    # 从队列中取出消息
                    message_data = redis_queue.dequeue_message(from_user, timeout=10)
 
                    if not message_data:
                        # 队列为空,清除处理状态并退出
                        redis_queue.set_processing_status(from_user, False)
                        logger.info(f"用户队列为空,结束处理: {from_user}")
                        break
 
                    # 处理消息
                    success = message_processor.process_single_message(message_data)
 
                    if success:
                        logger.info(f"消息处理成功: {from_user}")
                        # 更新队列长度信息
                        queue_length = redis_queue.get_queue_length(from_user)
                        logger.info(f"用户队列剩余长度: {from_user}, length={queue_length}")
                    else:
                        logger.error(f"消息处理失败: {from_user}")
                        # 可以考虑将失败的消息重新入队或记录到错误队列
 
                except Exception as e:
                    logger.error(f"处理用户消息异常: {from_user}, error={str(e)}")
 
                finally:
                    # 清除处理状态
                    redis_queue.set_processing_status(from_user, False)
 
                # 短暂休眠,避免过于频繁的处理
                time.sleep(1)
 
        except Exception as e:
            logger.error(f"用户消息处理线程异常: {from_user}, error={str(e)}")
 
        finally:
            # 清理活跃用户记录
            with self.lock:
                self.active_users.discard(from_user)
 
            logger.info(f"用户消息处理线程结束: {from_user}")
 
 
# 全局消息工作进程实例
message_worker = MessageWorker()