""" 联系人同步服务 """ from typing import List, Optional from loguru import logger from sqlalchemy.orm import Session from app.models.contact import Contact from app.models.database import get_db from app.services.ecloud_client import ecloud_client from app.services.friend_ignore_service import friend_ignore_service class ContactSyncService: """联系人同步服务""" def __init__(self): pass def sync_contacts_on_startup(self, w_id: str) -> bool: """ 启动时同步联系人信息 Args: w_id: 登录实例标识 Returns: 同步成功返回True,失败返回False """ try: logger.info(f"开始同步联系人信息: wId={w_id}") # 1. 初始化通讯录列表 if not ecloud_client.init_address_list(w_id): logger.error(f"初始化通讯录列表失败: wId={w_id}") return False # 2. 获取通讯录列表 address_data = ecloud_client.get_address_list(w_id) if not address_data: logger.error(f"获取通讯录列表失败: wId={w_id}") return False # 3. 获取好友列表中的wcid friends = address_data.get("friends", []) if not friends: logger.warning(f"好友列表为空: wId={w_id}") return True logger.info(f"获取到好友列表: wId={w_id}, count={len(friends)}") # 4. 将好友w_id添加到Redis忽略列表 friend_ignore_service.add_friends_to_ignore_list(friends) # 5. 批量获取联系人详细信息 return self._batch_sync_contacts(w_id, friends) except Exception as e: logger.error(f"同步联系人信息异常: wId={w_id}, error={str(e)}") return False def _batch_sync_contacts(self, w_id: str, wc_ids: List[str]) -> bool: """ 批量同步联系人信息 Args: w_id: 登录实例标识 wc_ids: 微信ID列表 Returns: 同步成功返回True,失败返回False """ try: # 将wcid列表用逗号拼接 wc_ids_str = ",".join(wc_ids) logger.info(f"开始批量获取联系人信息: wId={w_id}, wc_ids_count={len(wc_ids)}") # 调用获取联系人信息接口 contact_info = ecloud_client.get_contact_info(w_id, wc_ids_str) if not contact_info: logger.error(f"批量获取联系人信息失败: wId={w_id}") return False # 如果返回的是单个联系人信息,转换为列表 if isinstance(contact_info, dict): contact_list = [contact_info] else: contact_list = contact_info logger.info(f"获取到联系人详细信息: count={len(contact_list)}") # 保存到数据库 return self._save_contacts_to_db(contact_list) except Exception as e: logger.error(f"批量同步联系人信息异常: wId={w_id}, error={str(e)}") return False def _save_contacts_to_db(self, contact_list: List[dict]) -> bool: """ 保存联系人信息到数据库 Args: contact_list: 联系人信息列表 Returns: 保存成功返回True,失败返回False """ try: with next(get_db()) as db: saved_count = 0 updated_count = 0 for contact_data in contact_list: wc_id = contact_data.get("userName") or contact_data.get("wcId") if not wc_id: logger.warning(f"联系人信息缺少wcId: {contact_data}") continue # 检查是否已存在 existing_contact = db.query(Contact).filter(Contact.wc_id == wc_id).first() if existing_contact: # 更新现有联系人信息 existing_contact.user_name = contact_data.get("userName") existing_contact.nick_name = contact_data.get("nickName") existing_contact.remark = contact_data.get("remark") existing_contact.signature = contact_data.get("signature") existing_contact.sex = contact_data.get("sex") existing_contact.alias_name = contact_data.get("aliasName") existing_contact.country = contact_data.get("country") existing_contact.big_head = contact_data.get("bigHead") existing_contact.small_head = contact_data.get("smallHead") existing_contact.label_list = contact_data.get("labelList") existing_contact.v1 = contact_data.get("v1") updated_count += 1 logger.debug(f"更新联系人信息: wc_id={wc_id}, nick_name={contact_data.get('nickName')}") else: # 创建新联系人记录 new_contact = Contact( wc_id=wc_id, user_name=contact_data.get("userName"), nick_name=contact_data.get("nickName"), remark=contact_data.get("remark"), signature=contact_data.get("signature"), sex=contact_data.get("sex"), alias_name=contact_data.get("aliasName"), country=contact_data.get("country"), big_head=contact_data.get("bigHead"), small_head=contact_data.get("smallHead"), label_list=contact_data.get("labelList"), v1=contact_data.get("v1"), ) db.add(new_contact) saved_count += 1 logger.debug(f"新增联系人信息: wc_id={wc_id}, nick_name={contact_data.get('nickName')}") # 提交事务 db.commit() logger.info(f"联系人信息保存完成: 新增={saved_count}, 更新={updated_count}") return True except Exception as e: logger.error(f"保存联系人信息到数据库异常: error={str(e)}") if 'db' in locals(): db.rollback() return False # 全局联系人同步服务实例 contact_sync_service = ContactSyncService()