import xml.etree.ElementTree as ET from wecom.WXBizMsgCrypt import WXBizMsgCrypt import logging import time from redis_manager import RedisManager import json from config_manager import global_config config = global_config.get_config() class WeComUtils: @staticmethod def verify_url(timestamp, nonce, msg_encrypt, signature): wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '') ret, reply_echostr = wecom_crypt.VerifyURL(signature, timestamp, nonce, msg_encrypt) return ret, reply_echostr @staticmethod def decrypt_msg(post_data, msg_signature, time_stamp, nonce): wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '') ret, xml_content = wecom_crypt.DecryptMsg(post_data, msg_signature, time_stamp, nonce) return ret, xml_content @staticmethod def encrypt_msg(content, nonce, timestamp): wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '') ret, encrypt_msg = wecom_crypt.EncryptMsg(content, nonce, timestamp) return ret, encrypt_msg @staticmethod def parse_bot_message(xml_str): """解析机器人消息XML""" try: root = ET.fromstring(xml_str) result = { "ToUserName": root.find('ToUserName').text, "FromUserName": root.find('FromUserName').text, "CreateTime": root.find('CreateTime').text, "MsgType": root.find('MsgType').text, "Content": root.find('Content').text if root.find('Content') is not None else "", "MsgId": root.find('MsgId').text, "AgentID": root.find('AgentID').text if root.find('AgentID') is not None else "", } # 机器人特有字段 if root.find('ChatId') is not None: result["ChatId"] = root.find('ChatId').text if root.find('List') is not None: # 可能是菜单消息 result["List"] = [item.text for item in root.find('List')] return result except Exception as e: logging.error(f"XML parsing error: {e}") logging.error(f"XML content: {xml_str}") raise @staticmethod def generate_stream_id(user_id, msg_id): timestamp = str(int(time.time())) stream_id = f"{user_id}-{msg_id}" return stream_id @staticmethod def create_response_message(stream_id, first=False): """创建响应数据""" finish = False response_message = { "msgtype": "stream", "stream": { "id": stream_id, "finish": finish, "content": "" } } # 不是首次就获取值返回,是首次就直接返回 if not first: redis_manager = RedisManager() # 检查键是否存在 exists_stream_chunks = redis_manager.exists_stream_chunks(stream_id) if not exists_stream_chunks: return None # 获取块 content = redis_manager.get_stream_chunk(stream_id) # 获取块后判断是否存在,不存在则表示是最后的块 exists_stream_chunks = redis_manager.exists_stream_chunks(stream_id) if not exists_stream_chunks: finish = True response_message['stream']['finish'] = finish response_message['stream']['content'] = content response_message = json.dumps(response_message, ensure_ascii=False) return response_message @staticmethod def split_string_safely(s, max_bytes=20480, encoding='utf-8'): encoded = s.encode(encoding) chunks = [] start = 0 while start < len(encoded): end = start + max_bytes chunk = encoded[start:end] # 尝试解码,如果失败则回退字节直到解码成功 while True: try: # 尝试解码当前 chunk chunk = chunk.decode(encoding) break # 解码成功,退出循环 except UnicodeDecodeError: # 解码失败,说明截断了多字节字符,回退一个字节 end -= 1 chunk = encoded[start:end] chunks.append(chunk) start = end # 移动到下一个 chunk 的起始位置 return chunks