From 1225b6cbf0a028b765a0ab6d784bcb80459a67bb Mon Sep 17 00:00:00 2001 From: yj <2077506045@qq.com> Date: 星期三, 23 七月 2025 17:59:54 +0800 Subject: [PATCH] 功能更新 --- app/workers/message_worker.py | 62 +++++++++++++++++++++++++++++-- 1 files changed, 58 insertions(+), 4 deletions(-) diff --git a/app/workers/message_worker.py b/app/workers/message_worker.py index 21a28a3..bb06b8d 100644 --- a/app/workers/message_worker.py +++ b/app/workers/message_worker.py @@ -47,10 +47,34 @@ """鐩戞帶闃熷垪锛屼负鏈夋秷鎭殑鐢ㄦ埛鍚姩澶勭悊绾跨▼""" while self.running: try: - # 杩欓噷鍙互瀹炵幇鏇村鏉傜殑闃熷垪鍙戠幇鏈哄埗 - # 鐩墠绠�鍖栦负妫�鏌ュ凡鐭ョ殑娲昏穬鐢ㄦ埛 - # 鍦ㄥ疄闄呭簲鐢ㄤ腑锛屽彲浠ラ�氳繃Redis鐨凷CAN鍛戒护鎵弿鎵�鏈夐槦鍒� - + # 浣跨敤Redis鐨凷CAN鍛戒护鎵弿鎵�鏈夐槦鍒楅敭 + cursor = 0 + queue_keys = set() + + while self.running: + cursor, keys = self._scan_queue_keys(cursor) + queue_keys.update(keys) + + # 濡傛灉娓告爣涓�0锛岃〃绀烘壂鎻忓畬鎴� + if cursor == 0: + break + + # 涓烘瘡涓湁娑堟伅鐨勯槦鍒楀惎鍔ㄥ鐞嗙嚎绋� + for queue_key in queue_keys: + if not self.running: + break + + # 浠庨槦鍒楅敭涓彁鍙栫敤鎴稩D + if queue_key.startswith(redis_queue.queue_prefix): + from_user = queue_key[len(redis_queue.queue_prefix):] + + # 妫�鏌ラ槦鍒楁槸鍚︽湁娑堟伅 + queue_length = redis_queue.get_queue_length(from_user) + if queue_length > 0: + logger.info(f"鍙戠幇鐢ㄦ埛闃熷垪鏈夋秷鎭�: user={from_user}, length={queue_length}") + # 鍚姩鐢ㄦ埛闃熷垪澶勭悊 + self.process_user_queue(from_user) + # 娓呯悊宸插畬鎴愮殑绾跨▼ self._cleanup_finished_threads() @@ -60,6 +84,29 @@ except Exception as e: logger.error(f"闃熷垪鐩戞帶寮傚父: {str(e)}") time.sleep(10) + + def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple: + """ + 鎵弿闃熷垪閿� + + Args: + cursor: 鎵弿娓告爣 + count: 姣忔鎵弿鐨勯敭鏁伴噺 + + Returns: + (鏂版父鏍�, 闃熷垪閿垪琛�) + """ + try: + # 浣跨敤SCAN鍛戒护鎵弿鍖归厤妯″紡鐨勯敭 + new_cursor, keys = redis_queue.redis_client.scan( + cursor=cursor, + match=f"{redis_queue.queue_prefix}*", + count=count + ) + return new_cursor, keys + except Exception as e: + logger.error(f"鎵弿闃熷垪閿け璐�: {str(e)}") + return 0, [] def _cleanup_finished_threads(self): """娓呯悊宸插畬鎴愮殑绾跨▼""" @@ -102,6 +149,10 @@ """ try: logger.info(f"寮�濮嬪鐞嗙敤鎴锋秷鎭槦鍒�: {from_user}") + + # 鑾峰彇闃熷垪闀垮害 + queue_length = redis_queue.get_queue_length(from_user) + logger.info(f"鐢ㄦ埛闃熷垪鍒濆闀垮害: {from_user}, length={queue_length}") while self.running: # 妫�鏌ョ敤鎴锋槸鍚︽鍦ㄥ鐞嗕腑锛堥槻姝㈠苟鍙戯級 @@ -132,6 +183,9 @@ if success: logger.info(f"娑堟伅澶勭悊鎴愬姛: {from_user}") + # 鏇存柊闃熷垪闀垮害淇℃伅 + queue_length = redis_queue.get_queue_length(from_user) + logger.info(f"鐢ㄦ埛闃熷垪鍓╀綑闀垮害: {from_user}, length={queue_length}") else: logger.error(f"娑堟伅澶勭悊澶辫触: {from_user}") # 鍙互鑰冭檻灏嗗け璐ョ殑娑堟伅閲嶆柊鍏ラ槦鎴栬褰曞埌閿欒闃熷垪 -- Gitblit v1.9.1