""" 群组统计服务 用于统计群组中好友的发言次数 """ import time from typing import Optional, Dict, List, Tuple from loguru import logger from app.services.redis_queue import redis_queue from app.models.contact import Contact from app.models.database import get_db from config import settings class GroupStatsService: """群组统计服务""" def __init__(self): self.stats_key_prefix = "group_stats:" self.stats_expiry = 24 * 60 * 60 # 24小时过期 def _get_group_stats_key(self, group_id: str) -> str: """ 获取群组统计的Redis键名 Args: group_id: 群组ID Returns: Redis键名 """ return f"{self.stats_key_prefix}{group_id}" def increment_user_message_count(self, group_id: str, user_id: str) -> bool: """ 增加用户在群组中的发言次数 Args: group_id: 群组ID user_id: 用户ID Returns: 操作成功返回True,失败返回False """ try: stats_key = self._get_group_stats_key(group_id) # 使用Redis的HINCRBY命令增加计数 redis_queue.redis_client.hincrby(stats_key, user_id, 1) # 设置过期时间 redis_queue.redis_client.expire(stats_key, self.stats_expiry) logger.debug(f"用户发言次数已增加: group={group_id}, user={user_id}") return True except Exception as e: logger.error( f"增加用户发言次数失败: group={group_id}, user={user_id}, error={str(e)}" ) return False def get_group_message_stats(self, group_id: str) -> Dict[str, int]: """ 获取群组中所有用户的发言次数统计 Args: group_id: 群组ID Returns: 用户ID到发言次数的映射字典 """ try: stats_key = self._get_group_stats_key(group_id) stats = redis_queue.redis_client.hgetall(stats_key) # 将字节字符串转换为普通字符串,并转换计数为整数 result = {} for user_id, count in stats.items(): if isinstance(user_id, bytes): user_id = user_id.decode("utf-8") if isinstance(count, bytes): count = count.decode("utf-8") result[user_id] = int(count) logger.debug(f"获取群组发言统计: group={group_id}, stats={result}") return result except Exception as e: logger.error(f"获取群组发言统计失败: group={group_id}, error={str(e)}") return {} def get_most_active_user_nickname(self, group_id: str) -> str: """ 获取群组中发言次数最多的用户昵称 Args: group_id: 群组ID Returns: 发言次数最多的用户昵称,如果没有找到或发言次数都相同或都为0,则返回默认客服名称 """ try: # 获取群组发言统计 stats = self.get_group_message_stats(group_id) if not stats: logger.info(f"群组无发言统计数据,使用默认客服名称: group={group_id}") return settings.customer_service_default_name # 过滤掉发言次数为0的用户 active_stats = { user_id: count for user_id, count in stats.items() if count > 0 } if not active_stats: logger.info(f"群组无有效发言数据,使用默认客服名称: group={group_id}") return settings.customer_service_default_name # 检查是否所有用户发言次数都相同 counts = list(active_stats.values()) if len(set(counts)) == 1: logger.info( f"群组所有用户发言次数相同,使用默认客服名称: group={group_id}" ) return settings.customer_service_default_name # 找到发言次数最多的用户 most_active_user_id = max(active_stats, key=active_stats.get) max_count = active_stats[most_active_user_id] logger.info( f"找到最活跃用户: group={group_id}, user={most_active_user_id}, count={max_count}" ) # 根据用户ID查找昵称 nickname = self._get_user_nickname(most_active_user_id) if nickname: logger.info( f"获取到最活跃用户昵称: group={group_id}, user={most_active_user_id}, nickname={nickname}" ) return nickname else: logger.warning( f"未找到最活跃用户昵称,使用默认客服名称: group={group_id}, user={most_active_user_id}" ) return settings.customer_service_default_name except Exception as e: logger.error(f"获取最活跃用户昵称失败: group={group_id}, error={str(e)}") return settings.customer_service_default_name def _get_user_nickname(self, user_id: str) -> Optional[str]: """ 根据用户ID获取昵称 Args: user_id: 用户ID Returns: 用户昵称,如果未找到返回None """ try: with next(get_db()) as db: contact = db.query(Contact).filter(Contact.wc_id == user_id).first() if contact and contact.nick_name: return contact.nick_name else: logger.warning(f"未找到用户昵称: user_id={user_id}") return None except Exception as e: logger.error(f"查询用户昵称异常: user_id={user_id}, error={str(e)}") return None def get_group_stats_summary(self, group_id: str) -> Dict: """ 获取群组统计摘要信息 Args: group_id: 群组ID Returns: 包含统计摘要的字典 """ try: stats = self.get_group_message_stats(group_id) most_active_nickname = self.get_most_active_user_nickname(group_id) # 计算总发言次数 total_messages = sum(stats.values()) # 获取活跃用户数量(发言次数>0) active_users = len([count for count in stats.values() if count > 0]) # 构建用户昵称统计 user_stats = [] for user_id, count in stats.items(): if count > 0: nickname = self._get_user_nickname(user_id) user_stats.append( { "user_id": user_id, "nickname": nickname or "未知用户", "message_count": count, } ) # 按发言次数排序 user_stats.sort(key=lambda x: x["message_count"], reverse=True) return { "group_id": group_id, "total_messages": total_messages, "active_users": active_users, "most_active_nickname": most_active_nickname, "user_stats": user_stats, } except Exception as e: logger.error(f"获取群组统计摘要失败: group={group_id}, error={str(e)}") return { "group_id": group_id, "total_messages": 0, "active_users": 0, "most_active_nickname": settings.customer_service_default_name, "user_stats": [], } def clear_group_stats(self, group_id: str) -> bool: """ 清空指定群组的统计数据 Args: group_id: 群组ID Returns: 操作成功返回True,失败返回False """ try: stats_key = self._get_group_stats_key(group_id) redis_queue.redis_client.delete(stats_key) logger.info(f"已清空群组统计数据: group={group_id}") return True except Exception as e: logger.error(f"清空群组统计数据失败: group={group_id}, error={str(e)}") return False # 全局群组统计服务实例 group_stats_service = GroupStatsService()