"""
|
群组统计服务
|
用于统计群组中好友的发言次数
|
"""
|
|
import time
|
from typing import Optional, Dict, List, Tuple
|
from loguru import logger
|
from app.services.redis_queue import redis_queue
|
from app.models.contact import Contact
|
from app.models.database import get_db
|
from config import settings
|
|
|
class GroupStatsService:
|
"""群组统计服务"""
|
|
def __init__(self):
|
self.stats_key_prefix = "group_stats:"
|
self.stats_expiry = 24 * 60 * 60 # 24小时过期
|
|
def _get_group_stats_key(self, group_id: str) -> str:
|
"""
|
获取群组统计的Redis键名
|
|
Args:
|
group_id: 群组ID
|
|
Returns:
|
Redis键名
|
"""
|
return f"{self.stats_key_prefix}{group_id}"
|
|
def increment_user_message_count(self, group_id: str, user_id: str) -> bool:
|
"""
|
增加用户在群组中的发言次数
|
|
Args:
|
group_id: 群组ID
|
user_id: 用户ID
|
|
Returns:
|
操作成功返回True,失败返回False
|
"""
|
try:
|
stats_key = self._get_group_stats_key(group_id)
|
|
# 使用Redis的HINCRBY命令增加计数
|
redis_queue.redis_client.hincrby(stats_key, user_id, 1)
|
|
# 设置过期时间
|
redis_queue.redis_client.expire(stats_key, self.stats_expiry)
|
|
logger.debug(f"用户发言次数已增加: group={group_id}, user={user_id}")
|
return True
|
|
except Exception as e:
|
logger.error(
|
f"增加用户发言次数失败: group={group_id}, user={user_id}, error={str(e)}"
|
)
|
return False
|
|
def get_group_message_stats(self, group_id: str) -> Dict[str, int]:
|
"""
|
获取群组中所有用户的发言次数统计
|
|
Args:
|
group_id: 群组ID
|
|
Returns:
|
用户ID到发言次数的映射字典
|
"""
|
try:
|
stats_key = self._get_group_stats_key(group_id)
|
stats = redis_queue.redis_client.hgetall(stats_key)
|
|
# 将字节字符串转换为普通字符串,并转换计数为整数
|
result = {}
|
for user_id, count in stats.items():
|
if isinstance(user_id, bytes):
|
user_id = user_id.decode("utf-8")
|
if isinstance(count, bytes):
|
count = count.decode("utf-8")
|
result[user_id] = int(count)
|
|
logger.debug(f"获取群组发言统计: group={group_id}, stats={result}")
|
return result
|
|
except Exception as e:
|
logger.error(f"获取群组发言统计失败: group={group_id}, error={str(e)}")
|
return {}
|
|
def get_most_active_user_nickname(self, group_id: str) -> str:
|
"""
|
获取群组中发言次数最多的用户昵称
|
|
Args:
|
group_id: 群组ID
|
|
Returns:
|
发言次数最多的用户昵称,如果没有找到或发言次数都相同或都为0,则返回默认客服名称
|
"""
|
try:
|
# 获取群组发言统计
|
stats = self.get_group_message_stats(group_id)
|
|
if not stats:
|
logger.info(f"群组无发言统计数据,使用默认客服名称: group={group_id}")
|
return settings.customer_service_default_name
|
|
# 过滤掉发言次数为0的用户
|
active_stats = {
|
user_id: count for user_id, count in stats.items() if count > 0
|
}
|
|
if not active_stats:
|
logger.info(f"群组无有效发言数据,使用默认客服名称: group={group_id}")
|
return settings.customer_service_default_name
|
|
# 检查是否所有用户发言次数都相同
|
counts = list(active_stats.values())
|
if len(set(counts)) == 1:
|
logger.info(
|
f"群组所有用户发言次数相同,使用默认客服名称: group={group_id}"
|
)
|
return settings.customer_service_default_name
|
|
# 找到发言次数最多的用户
|
most_active_user_id = max(active_stats, key=active_stats.get)
|
max_count = active_stats[most_active_user_id]
|
|
logger.info(
|
f"找到最活跃用户: group={group_id}, user={most_active_user_id}, count={max_count}"
|
)
|
|
# 根据用户ID查找昵称
|
nickname = self._get_user_nickname(most_active_user_id)
|
|
if nickname:
|
logger.info(
|
f"获取到最活跃用户昵称: group={group_id}, user={most_active_user_id}, nickname={nickname}"
|
)
|
return nickname
|
else:
|
logger.warning(
|
f"未找到最活跃用户昵称,使用默认客服名称: group={group_id}, user={most_active_user_id}"
|
)
|
return settings.customer_service_default_name
|
|
except Exception as e:
|
logger.error(f"获取最活跃用户昵称失败: group={group_id}, error={str(e)}")
|
return settings.customer_service_default_name
|
|
def _get_user_nickname(self, user_id: str) -> Optional[str]:
|
"""
|
根据用户ID获取昵称
|
|
Args:
|
user_id: 用户ID
|
|
Returns:
|
用户昵称,如果未找到返回None
|
"""
|
try:
|
with next(get_db()) as db:
|
contact = db.query(Contact).filter(Contact.wc_id == user_id).first()
|
if contact and contact.nick_name:
|
return contact.nick_name
|
else:
|
logger.warning(f"未找到用户昵称: user_id={user_id}")
|
return None
|
except Exception as e:
|
logger.error(f"查询用户昵称异常: user_id={user_id}, error={str(e)}")
|
return None
|
|
def get_group_stats_summary(self, group_id: str) -> Dict:
|
"""
|
获取群组统计摘要信息
|
|
Args:
|
group_id: 群组ID
|
|
Returns:
|
包含统计摘要的字典
|
"""
|
try:
|
stats = self.get_group_message_stats(group_id)
|
most_active_nickname = self.get_most_active_user_nickname(group_id)
|
|
# 计算总发言次数
|
total_messages = sum(stats.values())
|
|
# 获取活跃用户数量(发言次数>0)
|
active_users = len([count for count in stats.values() if count > 0])
|
|
# 构建用户昵称统计
|
user_stats = []
|
for user_id, count in stats.items():
|
if count > 0:
|
nickname = self._get_user_nickname(user_id)
|
user_stats.append(
|
{
|
"user_id": user_id,
|
"nickname": nickname or "未知用户",
|
"message_count": count,
|
}
|
)
|
|
# 按发言次数排序
|
user_stats.sort(key=lambda x: x["message_count"], reverse=True)
|
|
return {
|
"group_id": group_id,
|
"total_messages": total_messages,
|
"active_users": active_users,
|
"most_active_nickname": most_active_nickname,
|
"user_stats": user_stats,
|
}
|
|
except Exception as e:
|
logger.error(f"获取群组统计摘要失败: group={group_id}, error={str(e)}")
|
return {
|
"group_id": group_id,
|
"total_messages": 0,
|
"active_users": 0,
|
"most_active_nickname": settings.customer_service_default_name,
|
"user_stats": [],
|
}
|
|
def clear_group_stats(self, group_id: str) -> bool:
|
"""
|
清空指定群组的统计数据
|
|
Args:
|
group_id: 群组ID
|
|
Returns:
|
操作成功返回True,失败返回False
|
"""
|
try:
|
stats_key = self._get_group_stats_key(group_id)
|
redis_queue.redis_client.delete(stats_key)
|
logger.info(f"已清空群组统计数据: group={group_id}")
|
return True
|
except Exception as e:
|
logger.error(f"清空群组统计数据失败: group={group_id}, error={str(e)}")
|
return False
|
|
|
# 全局群组统计服务实例
|
group_stats_service = GroupStatsService()
|