""" E云管家API客户端 """ import requests from typing import Optional, Dict, Any, List from loguru import logger from config import settings class ECloudClient: """E云管家API客户端""" def __init__(self): self.base_url = settings.ecloud_base_url.rstrip("/") self.headers = { "Content-Type": "application/json", "Authorization": settings.ecloud_authorization, } self.session = requests.Session() self.session.headers.update(self.headers) def get_contact_info(self, w_id: str, wc_id: str) -> Optional[Dict[str, Any]]: """ 获取联系人信息 Args: w_id: 登录实例标识 wc_id: 好友微信id/群id,多个使用英文逗号分隔 Returns: 联系人信息字典或列表,失败返回None - 单个wcId时返回字典 - 多个wcId时返回列表 """ try: url = f"{self.base_url}/getContact" payload = {"wId": w_id, "wcId": wc_id} logger.info(f"获取联系人信息: wId={w_id}, wcId={wc_id}") response = self.session.post(url, json=payload, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == "1000": contact_data = result.get("data", []) if contact_data and len(contact_data) > 0: # 如果是单个wcId,返回第一个联系人信息 if "," not in wc_id: logger.info(f"成功获取联系人信息: wcId={wc_id}") return contact_data[0] else: # 如果是多个wcId,返回完整列表 logger.info(f"成功获取批量联系人信息: count={len(contact_data)}") return contact_data else: logger.warning(f"联系人信息为空: wcId={wc_id}") return None else: logger.error( f"获取联系人信息失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}" ) return None except requests.exceptions.RequestException as e: logger.error(f"获取联系人信息网络错误: wcId={wc_id}, error={str(e)}") return None except Exception as e: logger.error(f"获取联系人信息异常: wcId={wc_id}, error={str(e)}") return None def send_text_message(self, w_id: str, wc_id: str, content: str, max_retries: int = None) -> bool: """ 发送文本消息 Args: w_id: 登录实例标识 wc_id: 接收人微信id/群id content: 文本内容消息 max_retries: 最大重试次数 Returns: 发送成功返回True,失败返回False """ if max_retries is None: from config import settings max_retries = settings.max_retry_count retry_count = 0 while retry_count <= max_retries: try: url = f"{self.base_url}/sendText" payload = {"wId": w_id, "wcId": wc_id, "content": content} logger.info( f"发送文本消息: wId={w_id}, wcId={wc_id}, content_length={len(content)}, retry={retry_count}" ) response = self.session.post(url, json=payload, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == "1000": logger.info(f"文本消息发送成功: wcId={wc_id}") return True else: logger.error( f"文本消息发送失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}" ) except requests.exceptions.RequestException as e: logger.error(f"发送文本消息网络错误: wcId={wc_id}, retry={retry_count}, error={str(e)}") except Exception as e: logger.error(f"发送文本消息异常: wcId={wc_id}, retry={retry_count}, error={str(e)}") retry_count += 1 if retry_count <= max_retries: from config import settings wait_time = settings.retry_delay * retry_count logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s") import time time.sleep(wait_time) logger.error( f"文本消息发送失败,已达最大重试次数: wcId={wc_id}, max_retries={max_retries}" ) return False def send_group_message(self, w_id: str, group_id: str, content: str) -> bool: """ 发送群聊消息 Args: w_id: 登录实例标识 group_id: 群id content: 文本内容消息 Returns: 发送成功返回True,失败返回False """ return self.send_text_message(w_id, group_id, content) def init_address_list(self, w_id: str) -> bool: """ 初始化通讯录列表 Args: w_id: 登录实例标识 Returns: 初始化成功返回True,失败返回False """ try: url = f"{self.base_url}/initAddressList" payload = {"wId": w_id} logger.info(f"初始化通讯录列表: wId={w_id}") response = self.session.post(url, json=payload, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == "1000": logger.info(f"初始化通讯录列表成功: wId={w_id}") return True else: logger.error( f"初始化通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}" ) return False except requests.exceptions.RequestException as e: logger.error(f"初始化通讯录列表网络错误: wId={w_id}, error={str(e)}") return False except Exception as e: logger.error(f"初始化通讯录列表异常: wId={w_id}, error={str(e)}") return False def get_address_list(self, w_id: str) -> Optional[Dict[str, Any]]: """ 获取通讯录列表 Args: w_id: 登录实例标识 Returns: 通讯录数据字典,失败返回None 返回格式: { "chatrooms": [...], # 群组列表 "friends": [...], # 好友列表 "ghs": [...], # 公众号列表 "others": [...] # 其他 } """ try: url = f"{self.base_url}/getAddressList" payload = {"wId": w_id} logger.info(f"获取通讯录列表: wId={w_id}") response = self.session.post(url, json=payload, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == "1000": address_data = result.get("data", {}) logger.info(f"成功获取通讯录列表: wId={w_id}, friends_count={len(address_data.get('friends', []))}") return address_data else: logger.error( f"获取通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}" ) return None except requests.exceptions.RequestException as e: logger.error(f"获取通讯录列表网络错误: wId={w_id}, error={str(e)}") return None except Exception as e: logger.error(f"获取通讯录列表异常: wId={w_id}, error={str(e)}") return None def send_group_at_message(self, w_id: str, wc_id: str, content: str, at_wc_ids: List[str], max_retries: int = None) -> bool: """ 发送群聊@消息 Args: w_id: 登录实例标识 wc_id: 接收方群id content: 文本内容消息(@的微信昵称需要自己拼接,必须拼接艾特符号,不然不生效) at_wc_ids: 艾特的微信id列表 max_retries: 最大重试次数 Returns: 发送成功返回True,失败返回False """ if max_retries is None: from config import settings max_retries = settings.max_retry_count retry_count = 0 while retry_count <= max_retries: try: url = f"{self.base_url}/sendText" # 将at_wc_ids列表用逗号拼接 at_str = ",".join(at_wc_ids) payload = { "wId": w_id, "wcId": wc_id, "content": content, "at": at_str } logger.info( f"发送群聊@消息: wId={w_id}, wcId={wc_id}, at={at_str}, content_length={len(content)}, retry={retry_count}" ) response = self.session.post(url, json=payload, timeout=30) response.raise_for_status() result = response.json() if result.get("code") == "1000": logger.info(f"群聊@消息发送成功: wcId={wc_id}, at={at_str}") return True else: logger.error( f"群聊@消息发送失败: wcId={wc_id}, at={at_str}, code={result.get('code')}, message={result.get('message')}" ) except requests.exceptions.RequestException as e: logger.error(f"发送群聊@消息网络错误: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}") except Exception as e: logger.error(f"发送群聊@消息异常: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}") retry_count += 1 if retry_count <= max_retries: from config import settings wait_time = settings.retry_delay * retry_count logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s") import time time.sleep(wait_time) logger.error( f"群聊@消息发送失败,已达最大重试次数: wcId={wc_id}, at={at_str}, max_retries={max_retries}" ) return False # 全局E云管家客户端实例 ecloud_client = ECloudClient()