import xml.etree.ElementTree as ET
|
from wecom.WXBizMsgCrypt import WXBizMsgCrypt
|
import logging
|
import time
|
from redis_manager import RedisManager
|
import json
|
from config_manager import global_config
|
|
|
config = global_config.get_config()
|
|
|
class WeComUtils:
|
@staticmethod
|
def verify_url(timestamp, nonce, msg_encrypt, signature):
|
wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '')
|
ret, reply_echostr = wecom_crypt.VerifyURL(signature, timestamp, nonce, msg_encrypt)
|
|
return ret, reply_echostr
|
|
@staticmethod
|
def decrypt_msg(post_data, msg_signature, time_stamp, nonce):
|
wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '')
|
ret, xml_content = wecom_crypt.DecryptMsg(post_data, msg_signature, time_stamp, nonce)
|
return ret, xml_content
|
|
@staticmethod
|
def encrypt_msg(content, nonce, timestamp):
|
wecom_crypt = WXBizMsgCrypt(config.WECOM_BOT_TOKEN, config.WECOM_BOT_AES_KEY, '')
|
ret, encrypt_msg = wecom_crypt.EncryptMsg(content, nonce, timestamp)
|
return ret, encrypt_msg
|
|
@staticmethod
|
def parse_bot_message(xml_str):
|
"""解析机器人消息XML"""
|
try:
|
root = ET.fromstring(xml_str)
|
result = {
|
"ToUserName": root.find('ToUserName').text,
|
"FromUserName": root.find('FromUserName').text,
|
"CreateTime": root.find('CreateTime').text,
|
"MsgType": root.find('MsgType').text,
|
"Content": root.find('Content').text if root.find('Content') is not None else "",
|
"MsgId": root.find('MsgId').text,
|
"AgentID": root.find('AgentID').text if root.find('AgentID') is not None else "",
|
}
|
|
# 机器人特有字段
|
if root.find('ChatId') is not None:
|
result["ChatId"] = root.find('ChatId').text
|
if root.find('List') is not None: # 可能是菜单消息
|
result["List"] = [item.text for item in root.find('List')]
|
|
return result
|
except Exception as e:
|
logging.error(f"XML parsing error: {e}")
|
logging.error(f"XML content: {xml_str}")
|
raise
|
|
|
@staticmethod
|
def generate_stream_id(user_id, msg_id):
|
timestamp = str(int(time.time()))
|
stream_id = f"{user_id}-{msg_id}"
|
return stream_id
|
|
@staticmethod
|
def create_response_message(stream_id, first=False):
|
"""创建响应数据"""
|
|
finish = False
|
|
response_message = {
|
"msgtype": "stream",
|
"stream": {
|
"id": stream_id,
|
"finish": finish,
|
"content": ""
|
}
|
}
|
|
# 不是首次就获取值返回,是首次就直接返回
|
if not first:
|
redis_manager = RedisManager()
|
# 检查键是否存在
|
exists_stream_chunks = redis_manager.exists_stream_chunks(stream_id)
|
if not exists_stream_chunks:
|
return None
|
|
# 获取块
|
content = redis_manager.get_stream_chunk(stream_id)
|
# 获取块后判断是否存在,不存在则表示是最后的块
|
exists_stream_chunks = redis_manager.exists_stream_chunks(stream_id)
|
if not exists_stream_chunks:
|
finish = True
|
|
response_message['stream']['finish'] = finish
|
response_message['stream']['content'] = content
|
|
response_message = json.dumps(response_message, ensure_ascii=False)
|
return response_message
|
|
@staticmethod
|
def split_string_safely(s, max_bytes=20480, encoding='utf-8'):
|
encoded = s.encode(encoding)
|
chunks = []
|
start = 0
|
|
while start < len(encoded):
|
end = start + max_bytes
|
chunk = encoded[start:end]
|
|
# 尝试解码,如果失败则回退字节直到解码成功
|
while True:
|
try:
|
# 尝试解码当前 chunk
|
chunk = chunk.decode(encoding)
|
break # 解码成功,退出循环
|
except UnicodeDecodeError:
|
# 解码失败,说明截断了多字节字符,回退一个字节
|
end -= 1
|
chunk = encoded[start:end]
|
|
chunks.append(chunk)
|
start = end # 移动到下一个 chunk 的起始位置
|
|
return chunks
|