1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
群组统计服务
用于统计群组中好友的发言次数
"""
 
import time
from typing import Optional, Dict, List, Tuple
from loguru import logger
from app.services.redis_queue import redis_queue
from app.models.contact import Contact
from app.models.database import get_db
from config import settings
 
 
class GroupStatsService:
    """群组统计服务"""
 
    def __init__(self):
        self.stats_key_prefix = "group_stats:"
        self.stats_expiry = 24 * 60 * 60  # 24小时过期
 
    def _get_group_stats_key(self, group_id: str) -> str:
        """
        获取群组统计的Redis键名
 
        Args:
            group_id: 群组ID
 
        Returns:
            Redis键名
        """
        return f"{self.stats_key_prefix}{group_id}"
 
    def increment_user_message_count(self, group_id: str, user_id: str) -> bool:
        """
        增加用户在群组中的发言次数
 
        Args:
            group_id: 群组ID
            user_id: 用户ID
 
        Returns:
            操作成功返回True,失败返回False
        """
        try:
            stats_key = self._get_group_stats_key(group_id)
 
            # 使用Redis的HINCRBY命令增加计数
            redis_queue.redis_client.hincrby(stats_key, user_id, 1)
 
            # 设置过期时间
            redis_queue.redis_client.expire(stats_key, self.stats_expiry)
 
            logger.debug(f"用户发言次数已增加: group={group_id}, user={user_id}")
            return True
 
        except Exception as e:
            logger.error(
                f"增加用户发言次数失败: group={group_id}, user={user_id}, error={str(e)}"
            )
            return False
 
    def get_group_message_stats(self, group_id: str) -> Dict[str, int]:
        """
        获取群组中所有用户的发言次数统计
 
        Args:
            group_id: 群组ID
 
        Returns:
            用户ID到发言次数的映射字典
        """
        try:
            stats_key = self._get_group_stats_key(group_id)
            stats = redis_queue.redis_client.hgetall(stats_key)
 
            # 将字节字符串转换为普通字符串,并转换计数为整数
            result = {}
            for user_id, count in stats.items():
                if isinstance(user_id, bytes):
                    user_id = user_id.decode("utf-8")
                if isinstance(count, bytes):
                    count = count.decode("utf-8")
                result[user_id] = int(count)
 
            logger.debug(f"获取群组发言统计: group={group_id}, stats={result}")
            return result
 
        except Exception as e:
            logger.error(f"获取群组发言统计失败: group={group_id}, error={str(e)}")
            return {}
 
    def get_most_active_user_nickname(self, group_id: str) -> str:
        """
        获取群组中发言次数最多的用户昵称
 
        Args:
            group_id: 群组ID
 
        Returns:
            发言次数最多的用户昵称,如果没有找到或发言次数都相同或都为0,则返回默认客服名称
        """
        try:
            # 获取群组发言统计
            stats = self.get_group_message_stats(group_id)
 
            if not stats:
                logger.info(f"群组无发言统计数据,使用默认客服名称: group={group_id}")
                return settings.customer_service_default_name
 
            # 过滤掉发言次数为0的用户
            active_stats = {
                user_id: count for user_id, count in stats.items() if count > 0
            }
 
            if not active_stats:
                logger.info(f"群组无有效发言数据,使用默认客服名称: group={group_id}")
                return settings.customer_service_default_name
 
            # 检查是否所有用户发言次数都相同
            counts = list(active_stats.values())
            if len(set(counts)) == 1:
                logger.info(
                    f"群组所有用户发言次数相同,使用默认客服名称: group={group_id}"
                )
                return settings.customer_service_default_name
 
            # 找到发言次数最多的用户
            most_active_user_id = max(active_stats, key=active_stats.get)
            max_count = active_stats[most_active_user_id]
 
            logger.info(
                f"找到最活跃用户: group={group_id}, user={most_active_user_id}, count={max_count}"
            )
 
            # 根据用户ID查找昵称
            nickname = self._get_user_nickname(most_active_user_id)
 
            if nickname:
                logger.info(
                    f"获取到最活跃用户昵称: group={group_id}, user={most_active_user_id}, nickname={nickname}"
                )
                return nickname
            else:
                logger.warning(
                    f"未找到最活跃用户昵称,使用默认客服名称: group={group_id}, user={most_active_user_id}"
                )
                return settings.customer_service_default_name
 
        except Exception as e:
            logger.error(f"获取最活跃用户昵称失败: group={group_id}, error={str(e)}")
            return settings.customer_service_default_name
 
    def _get_user_nickname(self, user_id: str) -> Optional[str]:
        """
        根据用户ID获取昵称
 
        Args:
            user_id: 用户ID
 
        Returns:
            用户昵称,如果未找到返回None
        """
        try:
            with next(get_db()) as db:
                contact = db.query(Contact).filter(Contact.wc_id == user_id).first()
                if contact and contact.nick_name:
                    return contact.nick_name
                else:
                    logger.warning(f"未找到用户昵称: user_id={user_id}")
                    return None
        except Exception as e:
            logger.error(f"查询用户昵称异常: user_id={user_id}, error={str(e)}")
            return None
 
    def get_group_stats_summary(self, group_id: str) -> Dict:
        """
        获取群组统计摘要信息
 
        Args:
            group_id: 群组ID
 
        Returns:
            包含统计摘要的字典
        """
        try:
            stats = self.get_group_message_stats(group_id)
            most_active_nickname = self.get_most_active_user_nickname(group_id)
 
            # 计算总发言次数
            total_messages = sum(stats.values())
 
            # 获取活跃用户数量(发言次数>0)
            active_users = len([count for count in stats.values() if count > 0])
 
            # 构建用户昵称统计
            user_stats = []
            for user_id, count in stats.items():
                if count > 0:
                    nickname = self._get_user_nickname(user_id)
                    user_stats.append(
                        {
                            "user_id": user_id,
                            "nickname": nickname or "未知用户",
                            "message_count": count,
                        }
                    )
 
            # 按发言次数排序
            user_stats.sort(key=lambda x: x["message_count"], reverse=True)
 
            return {
                "group_id": group_id,
                "total_messages": total_messages,
                "active_users": active_users,
                "most_active_nickname": most_active_nickname,
                "user_stats": user_stats,
            }
 
        except Exception as e:
            logger.error(f"获取群组统计摘要失败: group={group_id}, error={str(e)}")
            return {
                "group_id": group_id,
                "total_messages": 0,
                "active_users": 0,
                "most_active_nickname": settings.customer_service_default_name,
                "user_stats": [],
            }
 
    def clear_group_stats(self, group_id: str) -> bool:
        """
        清空指定群组的统计数据
 
        Args:
            group_id: 群组ID
 
        Returns:
            操作成功返回True,失败返回False
        """
        try:
            stats_key = self._get_group_stats_key(group_id)
            redis_queue.redis_client.delete(stats_key)
            logger.info(f"已清空群组统计数据: group={group_id}")
            return True
        except Exception as e:
            logger.error(f"清空群组统计数据失败: group={group_id}, error={str(e)}")
            return False
 
 
# 全局群组统计服务实例
group_stats_service = GroupStatsService()