From 1225b6cbf0a028b765a0ab6d784bcb80459a67bb Mon Sep 17 00:00:00 2001
From: yj <2077506045@qq.com>
Date: 星期三, 23 七月 2025 17:59:54 +0800
Subject: [PATCH] 功能更新

---
 app/workers/message_worker.py |   62 +++++++++++++++++++++++++++++--
 1 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/app/workers/message_worker.py b/app/workers/message_worker.py
index 21a28a3..bb06b8d 100644
--- a/app/workers/message_worker.py
+++ b/app/workers/message_worker.py
@@ -47,10 +47,34 @@
         """鐩戞帶闃熷垪锛屼负鏈夋秷鎭殑鐢ㄦ埛鍚姩澶勭悊绾跨▼"""
         while self.running:
             try:
-                # 杩欓噷鍙互瀹炵幇鏇村鏉傜殑闃熷垪鍙戠幇鏈哄埗
-                # 鐩墠绠�鍖栦负妫�鏌ュ凡鐭ョ殑娲昏穬鐢ㄦ埛
-                # 鍦ㄥ疄闄呭簲鐢ㄤ腑锛屽彲浠ラ�氳繃Redis鐨凷CAN鍛戒护鎵弿鎵�鏈夐槦鍒�
-
+                # 浣跨敤Redis鐨凷CAN鍛戒护鎵弿鎵�鏈夐槦鍒楅敭
+                cursor = 0
+                queue_keys = set()
+                
+                while self.running:
+                    cursor, keys = self._scan_queue_keys(cursor)
+                    queue_keys.update(keys)
+                    
+                    # 濡傛灉娓告爣涓�0锛岃〃绀烘壂鎻忓畬鎴�
+                    if cursor == 0:
+                        break
+                
+                # 涓烘瘡涓湁娑堟伅鐨勯槦鍒楀惎鍔ㄥ鐞嗙嚎绋�
+                for queue_key in queue_keys:
+                    if not self.running:
+                        break
+                    
+                    # 浠庨槦鍒楅敭涓彁鍙栫敤鎴稩D
+                    if queue_key.startswith(redis_queue.queue_prefix):
+                        from_user = queue_key[len(redis_queue.queue_prefix):]
+                        
+                        # 妫�鏌ラ槦鍒楁槸鍚︽湁娑堟伅
+                        queue_length = redis_queue.get_queue_length(from_user)
+                        if queue_length > 0:
+                            logger.info(f"鍙戠幇鐢ㄦ埛闃熷垪鏈夋秷鎭�: user={from_user}, length={queue_length}")
+                            # 鍚姩鐢ㄦ埛闃熷垪澶勭悊
+                            self.process_user_queue(from_user)
+                
                 # 娓呯悊宸插畬鎴愮殑绾跨▼
                 self._cleanup_finished_threads()
 
@@ -60,6 +84,29 @@
             except Exception as e:
                 logger.error(f"闃熷垪鐩戞帶寮傚父: {str(e)}")
                 time.sleep(10)
+    
+    def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple:
+        """
+        鎵弿闃熷垪閿�
+        
+        Args:
+            cursor: 鎵弿娓告爣
+            count: 姣忔鎵弿鐨勯敭鏁伴噺
+            
+        Returns:
+            (鏂版父鏍�, 闃熷垪閿垪琛�)
+        """
+        try:
+            # 浣跨敤SCAN鍛戒护鎵弿鍖归厤妯″紡鐨勯敭
+            new_cursor, keys = redis_queue.redis_client.scan(
+                cursor=cursor,
+                match=f"{redis_queue.queue_prefix}*",
+                count=count
+            )
+            return new_cursor, keys
+        except Exception as e:
+            logger.error(f"鎵弿闃熷垪閿け璐�: {str(e)}")
+            return 0, []
 
     def _cleanup_finished_threads(self):
         """娓呯悊宸插畬鎴愮殑绾跨▼"""
@@ -102,6 +149,10 @@
         """
         try:
             logger.info(f"寮�濮嬪鐞嗙敤鎴锋秷鎭槦鍒�: {from_user}")
+            
+            # 鑾峰彇闃熷垪闀垮害
+            queue_length = redis_queue.get_queue_length(from_user)
+            logger.info(f"鐢ㄦ埛闃熷垪鍒濆闀垮害: {from_user}, length={queue_length}")
 
             while self.running:
                 # 妫�鏌ョ敤鎴锋槸鍚︽鍦ㄥ鐞嗕腑锛堥槻姝㈠苟鍙戯級
@@ -132,6 +183,9 @@
 
                     if success:
                         logger.info(f"娑堟伅澶勭悊鎴愬姛: {from_user}")
+                        # 鏇存柊闃熷垪闀垮害淇℃伅
+                        queue_length = redis_queue.get_queue_length(from_user)
+                        logger.info(f"鐢ㄦ埛闃熷垪鍓╀綑闀垮害: {from_user}, length={queue_length}")
                     else:
                         logger.error(f"娑堟伅澶勭悊澶辫触: {from_user}")
                         # 鍙互鑰冭檻灏嗗け璐ョ殑娑堟伅閲嶆柊鍏ラ槦鎴栬褰曞埌閿欒闃熷垪

--
Gitblit v1.9.1