"""
|
E云管家API客户端
|
"""
|
|
import requests
|
from typing import Optional, Dict, Any, List
|
from loguru import logger
|
from config import settings
|
|
|
class ECloudClient:
|
"""E云管家API客户端"""
|
|
def __init__(self):
|
self.base_url = settings.ecloud_base_url.rstrip("/")
|
self.headers = {
|
"Content-Type": "application/json",
|
"Authorization": settings.ecloud_authorization,
|
}
|
self.session = requests.Session()
|
self.session.headers.update(self.headers)
|
|
def _filter_keywords(self, content: str) -> str:
|
"""
|
过滤消息内容中的关键词
|
|
Args:
|
content: 原始消息内容
|
|
Returns:
|
过滤后的消息内容
|
"""
|
if not settings.keyword_filter_enabled or not settings.keyword_filter_keywords:
|
return content
|
|
filtered_content = content
|
for keyword in settings.keyword_filter_keywords:
|
if keyword in filtered_content:
|
filtered_content = filtered_content.replace(keyword, "")
|
logger.info(f"过滤关键词: {keyword}")
|
# 去除前后空格
|
filtered_content = filtered_content.strip()
|
return filtered_content
|
|
def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]:
|
"""
|
获取联系人信息
|
|
Args:
|
w_id: 登录实例标识
|
wc_id: 好友微信id/群id,多个使用英文逗号分隔
|
|
Returns:
|
联系人信息字典或列表,失败返回None
|
- 单个wcId时返回字典
|
- 多个wcId时返回列表
|
"""
|
try:
|
url = f"{self.base_url}/getContact"
|
payload = {"wId": w_id, "wcId": wc_id}
|
|
logger.info(f"获取联系人信息: wId={w_id}, wcId={wc_id}")
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
contact_data = result.get("data", [])
|
if contact_data and len(contact_data) > 0:
|
# 如果是单个wcId,返回第一个联系人信息
|
if "," not in wc_id:
|
logger.info(f"成功获取联系人信息: wcId={wc_id}")
|
return contact_data[0]
|
else:
|
# 如果是多个wcId,返回完整列表
|
logger.info(f"成功获取批量联系人信息: count={len(contact_data)}")
|
return contact_data
|
else:
|
logger.warning(f"联系人信息为空: wcId={wc_id}")
|
return None
|
else:
|
logger.error(
|
f"获取联系人信息失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}"
|
)
|
return None
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"获取联系人信息网络错误: wcId={wc_id}, error={str(e)}")
|
return None
|
except Exception as e:
|
logger.error(f"获取联系人信息异常: wcId={wc_id}, error={str(e)}")
|
return None
|
|
def send_text_message(self, w_id: str, wc_id: str, content: str, max_retries: int = None) -> bool:
|
"""
|
发送文本消息
|
|
Args:
|
w_id: 登录实例标识
|
wc_id: 接收人微信id/群id
|
content: 文本内容消息
|
max_retries: 最大重试次数
|
|
Returns:
|
发送成功返回True,失败返回False
|
"""
|
# 过滤关键词
|
filtered_content = self._filter_keywords(content)
|
|
if max_retries is None:
|
from config import settings
|
max_retries = settings.max_retry_count
|
|
retry_count = 0
|
while retry_count <= max_retries:
|
try:
|
url = f"{self.base_url}/sendText"
|
payload = {"wId": w_id, "wcId": wc_id, "content": filtered_content}
|
|
logger.info(
|
f"发送文本消息: wId={w_id}, wcId={wc_id}, content_length={len(filtered_content)}, retry={retry_count}"
|
)
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
logger.info(f"文本消息发送成功: wcId={wc_id}")
|
return True
|
else:
|
logger.error(
|
f"文本消息发送失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}"
|
)
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"发送文本消息网络错误: wcId={wc_id}, retry={retry_count}, error={str(e)}")
|
except Exception as e:
|
logger.error(f"发送文本消息异常: wcId={wc_id}, retry={retry_count}, error={str(e)}")
|
|
retry_count += 1
|
if retry_count <= max_retries:
|
from config import settings
|
wait_time = settings.retry_delay * retry_count
|
logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s")
|
import time
|
time.sleep(wait_time)
|
|
logger.error(
|
f"文本消息发送失败,已达最大重试次数: wcId={wc_id}, max_retries={max_retries}"
|
)
|
return False
|
|
def send_group_message(self, w_id: str, group_id: str, content: str) -> bool:
|
"""
|
发送群聊消息
|
|
Args:
|
w_id: 登录实例标识
|
group_id: 群id
|
content: 文本内容消息
|
|
Returns:
|
发送成功返回True,失败返回False
|
"""
|
# 过滤关键词
|
filtered_content = self._filter_keywords(content)
|
return self.send_text_message(w_id, group_id, filtered_content)
|
|
def init_address_list(self, w_id: str) -> bool:
|
"""
|
初始化通讯录列表
|
|
Args:
|
w_id: 登录实例标识
|
|
Returns:
|
初始化成功返回True,失败返回False
|
"""
|
try:
|
url = f"{self.base_url}/initAddressList"
|
payload = {"wId": w_id}
|
|
logger.info(f"初始化通讯录列表: wId={w_id}")
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
logger.info(f"初始化通讯录列表成功: wId={w_id}")
|
return True
|
else:
|
logger.error(
|
f"初始化通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}"
|
)
|
return False
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"初始化通讯录列表网络错误: wId={w_id}, error={str(e)}")
|
return False
|
except Exception as e:
|
logger.error(f"初始化通讯录列表异常: wId={w_id}, error={str(e)}")
|
return False
|
|
def get_address_list(self, w_id: str) -> Optional[Dict[str, Any]]:
|
"""
|
获取通讯录列表
|
|
Args:
|
w_id: 登录实例标识
|
|
Returns:
|
通讯录数据字典,失败返回None
|
返回格式: {
|
"chatrooms": [...], # 群组列表
|
"friends": [...], # 好友列表
|
"ghs": [...], # 公众号列表
|
"others": [...] # 其他
|
}
|
"""
|
try:
|
url = f"{self.base_url}/getAddressList"
|
payload = {"wId": w_id}
|
|
logger.info(f"获取通讯录列表: wId={w_id}")
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
address_data = result.get("data", {})
|
logger.info(f"成功获取通讯录列表: wId={w_id}, friends_count={len(address_data.get('friends', []))}")
|
return address_data
|
else:
|
logger.error(
|
f"获取通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}"
|
)
|
return None
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"获取通讯录列表网络错误: wId={w_id}, error={str(e)}")
|
return None
|
except Exception as e:
|
logger.error(f"获取通讯录列表异常: wId={w_id}, error={str(e)}")
|
return None
|
|
def send_group_at_message(self, w_id: str, wc_id: str, content: str, at_wc_ids: List[str], max_retries: int = None) -> bool:
|
"""
|
发送群聊@消息
|
|
Args:
|
w_id: 登录实例标识
|
wc_id: 接收方群id
|
content: 文本内容消息(@的微信昵称需要自己拼接,必须拼接艾特符号,不然不生效)
|
at_wc_ids: 艾特的微信id列表
|
max_retries: 最大重试次数
|
|
Returns:
|
发送成功返回True,失败返回False
|
"""
|
# 过滤关键词
|
filtered_content = self._filter_keywords(content)
|
|
if max_retries is None:
|
from config import settings
|
max_retries = settings.max_retry_count
|
|
retry_count = 0
|
while retry_count <= max_retries:
|
try:
|
url = f"{self.base_url}/sendText"
|
# 将at_wc_ids列表用逗号拼接
|
at_str = ",".join(at_wc_ids)
|
payload = {
|
"wId": w_id,
|
"wcId": wc_id,
|
"content": filtered_content,
|
"at": at_str
|
}
|
|
logger.info(
|
f"发送群聊@消息: wId={w_id}, wcId={wc_id}, at={at_str}, content_length={len(filtered_content)}, retry={retry_count}"
|
)
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
logger.info(f"群聊@消息发送成功: wcId={wc_id}, at={at_str}")
|
return True
|
else:
|
logger.error(
|
f"群聊@消息发送失败: wcId={wc_id}, at={at_str}, code={result.get('code')}, message={result.get('message')}"
|
)
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"发送群聊@消息网络错误: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}")
|
except Exception as e:
|
logger.error(f"发送群聊@消息异常: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}")
|
|
retry_count += 1
|
if retry_count <= max_retries:
|
from config import settings
|
wait_time = settings.retry_delay * retry_count
|
logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s")
|
import time
|
time.sleep(wait_time)
|
|
logger.error(
|
f"群聊@消息发送失败,已达最大重试次数: wcId={wc_id}, at={at_str}, max_retries={max_retries}"
|
)
|
return False
|
|
def query_online_wechat_list(self) -> Optional[List[Dict[str, str]]]:
|
"""
|
查询账号中在线的微信列表
|
|
Returns:
|
在线微信列表,每个元素包含wcId和wId,失败返回None
|
返回格式: [
|
{
|
"wcId": "wxid_i6qsbbjenju2",
|
"wId": "72223018-7f2a-4f4f-bfa3-26e47dbd61"
|
}
|
]
|
"""
|
try:
|
url = f"{self.base_url}/queryLoginWx"
|
payload = {}
|
|
logger.info("查询在线微信列表")
|
|
response = self.session.post(url, json=payload, timeout=30)
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("code") == "1000":
|
online_list = result.get("data", [])
|
logger.info(f"成功查询在线微信列表: count={len(online_list)}")
|
return online_list
|
else:
|
logger.warning(
|
f"查询在线微信列表失败: code={result.get('code')}, message={result.get('message')}"
|
)
|
return []
|
|
except requests.exceptions.RequestException as e:
|
logger.error(f"查询在线微信列表网络错误: error={str(e)}")
|
return None
|
except Exception as e:
|
logger.error(f"查询在线微信列表异常: error={str(e)}")
|
return None
|
|
|
# 全局E云管家客户端实例
|
ecloud_client = ECloudClient()
|