yj
2025-09-03 f28ac0166536a2a4b68cac685a41ea667f60f7e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
"""
消息处理核心逻辑
"""
 
import json
import time
import re
import xml.etree.ElementTree as ET
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from loguru import logger
 
from app.models.database import get_db
from app.models.contact import Contact
from app.models.conversation import Conversation
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
from app.services.friend_ignore_service import friend_ignore_service
from app.services.silence_service import silence_service
from app.services.group_stats_service import group_stats_service
from config import settings
 
 
class MessageProcessor:
    """消息处理器"""
 
    def __init__(self):
        pass
 
    def extract_refer_message_content(self, callback_data: Dict[str, Any]) -> str:
        """
        提取引用消息的内容(message_type为80014)
 
        Args:
            callback_data: 回调数据
 
        Returns:
            组合后的消息内容(content + title,中间空一行)
        """
        try:
            data = callback_data.get("data", {})
            content = data.get("content", "")
            title = data.get("title", "")
 
            # 解析XML内容,提取msg>appmsg>refermsg>content标签中的内容
            refer_content = ""
            xml_title = ""
            try:
                # 解析XML
                root = ET.fromstring(content)
 
                # 查找msg>appmsg>refermsg>content路径
                appmsg = root.find("appmsg")
                if appmsg is not None:
                    # 提取XML中的title(如果存在)
                    title_element = appmsg.find("title")
                    if title_element is not None and title_element.text:
                        xml_title = title_element.text.strip()
 
                    # 提取引用消息内容
                    refermsg = appmsg.find("refermsg")
                    if refermsg is not None:
                        content_element = refermsg.find("content")
                        if content_element is not None and content_element.text:
                            refer_content = content_element.text.strip()
 
            except ET.ParseError as e:
                logger.warning(f"XML解析失败: {str(e)}, content={content}")
                # 如果XML解析失败,使用原始content
                refer_content = content
 
            # 确定最终使用的title:优先使用XML中的title,其次使用data.title
            final_title = xml_title if xml_title else title
 
            # 组合内容:refer_content在前,final_title在后,中间空一行
            if refer_content and final_title:
                combined_content = f"{refer_content}\n\n{final_title}"
            elif refer_content:
                combined_content = refer_content
            elif final_title:
                combined_content = final_title
            else:
                combined_content = content  # 如果都没有,使用原始content
 
            logger.info(
                f"引用消息内容提取完成: refer_content_length={len(refer_content)}, xml_title_length={len(xml_title)}, data_title_length={len(title)}, final_title_length={len(final_title)}"
            )
            return combined_content
 
        except Exception as e:
            logger.error(f"提取引用消息内容异常: error={str(e)}")
            # 异常情况下返回原始content
            return callback_data.get("data", {}).get("content", "")
 
    def parse_at_mentions(self, ai_answer: str, from_user: str) -> Tuple[str, List[str]]:
        """
        解析AI回复中的@字符,提取客服名称
 
        Args:
            ai_answer: AI回复内容
 
        Returns:
            (处理后的消息内容, 需要@的客服wcid列表)
        """
        try:
            # 获取配置的客服名称列表
            customer_service_names = settings.customer_service_names
 
            # 查找所有@字符后的客服名称
            at_pattern = r"@([^\s]+)"
            matches = re.findall(at_pattern, ai_answer)
 
            valid_at_names = []
            at_wc_ids = []
 
            for match in matches:
                # 检查是否在配置的客服名称列表中
                if match in customer_service_names:
                    valid_at_names.append(match)
                    logger.info(f"发现有效的@客服名称: {match}")
 
            # 如果有有效的@客服名称,查询数据库获取wcid
            if valid_at_names:
                with next(get_db()) as db:
                    for name in valid_at_names:
                        # 根据nick_name查找联系人
                        contact = (
                            db.query(Contact).filter(Contact.nick_name == name).first()
                        )
                        if contact:
                            # 如果from_user包含@openim表示是企业微信,使用work_wc_id
                            if "@openim" in from_user:
                                at_wc_ids.append(contact.work_wc_id)
                                logger.info(
                                    f"找到客服联系人-微信: name={name}, wc_id={contact.work_wc_id}"
                                )
                            else:
                                at_wc_ids.append(contact.wc_id)
                                logger.info(
                                    f"找到客服联系人-企业微信: name={name}, wc_id={contact.wc_id}"
                                )
                        else:
                            logger.warning(f"未找到客服联系人: name={name}")
 
            return ai_answer, at_wc_ids
 
        except Exception as e:
            logger.error(f"解析@字符异常: error={str(e)}")
            return ai_answer, []
 
    def is_end_str(self, ai_answer: str) -> bool:
        """
        解析AI回复判断是否是结束字符串
 
        Args:
            ai_answer: AI回复内容
 
        Returns:
        """
        try:
            # 获取配置的结束字符串列表
            end_str_list = settings.end_str_list
            for end_str in end_str_list:
                substrings = end_str.split(',')
                # 检查 match_str 是否包含每一个子字符串
                if all(sub in ai_answer for sub in substrings):
                    return True
            return False
        except Exception as e:
            logger.error(f"解析结束字符串异常: error={str(e)}")
            return False
 
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
        检查是否是有效的群聊消息
 
        Args:
            callback_data: 回调数据
 
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)或引用消息(80014)
        message_type = callback_data.get("messageType")
        print(f"data: {callback_data}")
        if message_type not in ["80001", "80014"]:
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
 
        # 检查是否是自己发送的消息
        data = callback_data.get("data", {})
        if data.get("self", False):
            logger.info(f"忽略自己发送的消息: fromUser={data.get('fromUser')}")
            return False
 
        # 检查必要字段
        if (
            not data.get("fromUser")
            or not data.get("fromGroup")
            or not data.get("content")
        ):
            logger.warning(f"消息缺少必要字段: data={data}")
            return False
 
        # 获取用户和群组信息
        from_user = data.get("fromUser")
        from_group = data.get("fromGroup")
 
        # 检查发送者是否在好友忽略列表中(传入群组ID用于测试群组检查)
        is_friend_ignored = friend_ignore_service.is_friend_ignored(from_user, from_group)
 
        if is_friend_ignored:
            logger.info(
                f"忽略好友发送的消息: fromUser={from_user}, fromGroup={from_group}"
            )
            # 统计被忽略的好友发言次数(确保被忽略的好友消息也纳入统计)
            group_stats_service.increment_user_message_count(from_group, from_user)
            # 激活或延长该群组的静默模式
            if silence_service.is_silence_active(from_group):
                # 如果该群组静默模式已激活,延长时间
                silence_service.extend_silence_mode(from_group)
                logger.info(
                    f"好友消息被忽略,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                )
            else:
                # 如果该群组静默模式未激活,激活静默模式
                silence_service.activate_silence_mode(from_group)
                logger.info(
                    f"好友消息被忽略,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                )
            return False
 
        # # 统计正常处理的好友发言次数
        # group_stats_service.increment_user_message_count(from_group, from_user)
 
        # 检查该群组的静默模式是否激活(在好友忽略检查之后)
        if silence_service.is_silence_active(from_group):
            logger.info(f"群组静默模式激活中,忽略消息: fromGroup={from_group}")
            return False
 
        return True
 
    def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool:
        """
        将回调消息加入队列
 
        Args:
            callback_data: 回调数据
 
        Returns:
            是否成功加入队列
        """
        # 验证消息有效性
        if not self.is_valid_group_message(callback_data):
            return False
 
        # 导入消息聚合服务(延迟导入避免循环依赖)
        from app.services.message_aggregator import message_aggregator
 
        # 尝试将消息添加到聚合中
        should_process_immediately, aggregated_data = (
            message_aggregator.add_message_to_aggregation(callback_data)
        )
 
        if should_process_immediately:
            # 需要立即处理(不聚合或聚合超时)
            data = (
                aggregated_data.get("data", {})
                if aggregated_data
                else callback_data.get("data", {})
            )
            from_user = data.get("fromUser")
 
            # 将消息加入Redis队列
            return redis_queue.enqueue_message(
                from_user, aggregated_data or callback_data
            )
        else:
            # 消息已被聚合,不需要立即处理
            logger.info(
                f"消息已添加到聚合队列,等待聚合处理: fromUser={callback_data.get('data', {}).get('fromUser')}"
            )
            return True
 
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
        确保联系人信息存在于数据库中
 
        Args:
            from_group: 群组ID
            w_id: 登录实例标识
            db: 数据库会话
 
        Returns:
            是否成功确保联系人存在
        """
        try:
            # 检查数据库中是否已存在该联系人
            existing_contact = (
                db.query(Contact).filter(Contact.wc_id == from_group).first()
            )
 
            if existing_contact:
                logger.info(f"联系人已存在: wc_id={from_group}")
                return True
 
            # 调用E云管家API获取联系人信息
            contact_info = ecloud_client.get_contact_info(w_id, from_group)
 
            if not contact_info:
                logger.error(f"无法获取联系人信息: wc_id={from_group}")
                return False
 
            # 保存联系人信息到数据库
            new_contact = Contact(
                wc_id=from_group,
                user_name=contact_info.get("userName"),
                nick_name=contact_info.get("nickName"),
                remark=contact_info.get("remark"),
                signature=contact_info.get("signature"),
                sex=contact_info.get("sex"),
                alias_name=contact_info.get("aliasName"),
                country=contact_info.get("country"),
                big_head=contact_info.get("bigHead"),
                small_head=contact_info.get("smallHead"),
                label_list=contact_info.get("labelList"),
                v1=contact_info.get("v1"),
            )
 
            db.add(new_contact)
            db.commit()
 
            logger.info(
                f"成功保存联系人信息: wc_id={from_group}, nick_name={contact_info.get('nickName')}"
            )
            return True
 
        except Exception as e:
            logger.error(f"确保联系人存在失败: wc_id={from_group}, error={str(e)}")
            db.rollback()
            return False
 
    def process_single_message(self, message_data: Dict[str, Any]) -> bool:
        """
        处理单条消息
 
        Args:
            message_data: 消息数据
 
        Returns:
            是否处理成功
        """
        try:
            data = message_data.get("data", {})
            from_user = data.get("fromUser")
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
            message_type = message_data.get("messageType")
 
            logger.info(
                f"开始处理消息: from_user={from_user}, from_group={from_group}, message_type={message_type}"
            )
 
            # 根据消息类型处理内容
            if message_type == "80014":
                # 引用消息,需要提取XML中的内容并与title组合
                content = self.extract_refer_message_content(message_data)
                logger.info(f"引用消息内容处理完成: content_length={len(content)}")
 
            # 使用上下文管理器确保数据库会话正确管理
            with next(get_db()) as db:
                # 3.1 确保联系人信息存在
                if not self.ensure_contact_exists(from_group, w_id, db):
                    logger.error(f"联系人信息处理失败: from_group={from_group}")
                    return False
 
                # 3.2 获取群组中发言次数最多的用户昵称
                # most_active_nickname = (
                #     group_stats_service.get_most_active_user_nickname(from_group)
                # )
                # logger.info(
                #     f"群组最活跃用户昵称: group={from_group}, nickname={most_active_nickname}"
                # )
                # 获取默认客服名称
                nickname = settings.customer_service_default_name
 
                # 3.3 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
 
                # 调用Dify接口发送消息(根据配置选择模式)
                dify_response = dify_client.send_message(
                    query=content,
                    user=from_user,
                    conversation_id=conversation_id,
                    nick_name=nickname,
                )
 
                if silence_service.is_silence_active(from_group):
                    # 回复前判断是否激活静默,已静默则不回复
                    logger.error(f"Dify已响应但群组已静默:from_user={from_user}")
                    return False
 
                if not dify_response:
                    logger.error(f"Dify响应失败: from_user={from_user}")
                    return False
 
                # 获取AI回答和新的conversation_id
                ai_answer = dify_response.get("answer", "")
                new_conversation_id = dify_response.get("conversation_id", "")
 
                # 更新Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(
                        from_user, new_conversation_id, from_group, settings.silence_duration_minutes * 60
                    )
 
                # 3.4 保存对话记录到数据库
                # 按用户、群组和小时分组对话记录
                current_time = datetime.now()
                hour_key = current_time.strftime("%Y%m%d_%H")
 
                # 查找当前用户在当前群组当前小时的对话记录
                existing_conversation = (
                    db.query(Conversation)
                    .filter(
                        Conversation.from_user == from_user,
                        Conversation.conversation_id == new_conversation_id,
                        Conversation.group == from_group,
                        Conversation.hour == hour_key,
                    )
                    .first()
                )
 
                if existing_conversation:
                    # 更新现有记录 - 使用JSON格式追加对话内容(当前用户在当前群组当前小时的对话)
                    try:
                        # 解析现有的content JSON
                        if existing_conversation.content:
                            content_list = json.loads(existing_conversation.content)
                        else:
                            content_list = []
 
                        # 追加新的对话内容
                        content_list.append({"user": content, "ai": ai_answer})
 
                        # 更新记录
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                        logger.info(
                            f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 对话轮次={len(content_list)}"
                        )
                    except json.JSONDecodeError as e:
                        logger.error(f"解析现有对话内容JSON失败: {str(e)}, 重新创建")
                        # 如果JSON解析失败,重新创建content
                        content_list = [{"user": content, "ai": ai_answer}]
                        existing_conversation.content = json.dumps(
                            content_list, ensure_ascii=False
                        )
                        existing_conversation.is_processed = True
                else:
                    # 创建新记录 - 新的用户群组小时对话或首次对话,使用JSON格式存储对话内容
                    content_list = [{"user": content, "ai": ai_answer}]
                    new_conversation = Conversation(
                        from_user=from_user,
                        conversation_id=new_conversation_id,
                        group=from_group,
                        hour=hour_key,
                        content=json.dumps(content_list, ensure_ascii=False),
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(
                        f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, 初始对话轮次=1"
                    )
 
                db.commit()
 
                # 发送AI回答到群聊
                success = False
                if ai_answer:
                    # 解析AI回复中的@字符
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer, from_user)
                    # 判断AI回复是否是结束字符串
                    is_end_str = self.is_end_str(ai_answer)
                    # 发送消息,最多重试3次
                    for attempt in range(3):
                        if at_wc_ids:
                            # 如果有@客服,使用群聊@接口
                            logger.info(f"使用群聊@接口发送消息: at_wc_ids={at_wc_ids}")
                            if ecloud_client.send_group_at_message(
                                w_id, from_group, processed_answer, at_wc_ids
                            ):
                                # @后触发静默模式
                                if silence_service.is_silence_active(from_group):
                                    # 如果该群组静默模式已激活,延长时间
                                    silence_service.extend_silence_mode(from_group)
                                    logger.info(
                                        f"AI回复@客服,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                                    )
                                else:
                                    # 如果该群组静默模式未激活,激活静默模式
                                    flag = silence_service.activate_silence_mode(from_group)
                                    if flag:
                                        logger.info(
                                        f"AI回复@客服,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                                        )
                                    else:
                                        logger.info(
                                        f"AI回复@客服,未激活静默模式: fromUser={from_user}, fromGroup={from_group}"
                                        )
                                success = True
                                break
                        else:
                            # 普通群聊消息
                            logger.info("使用普通群聊接口发送消息")
                            if ecloud_client.send_group_message(
                                w_id, from_group, processed_answer
                            ):
                                success = True
                                break
 
                        logger.warning(
                            f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}"
                        )
                        if attempt < 2:  # 不是最后一次尝试,等待一段时间再重试
                            time.sleep(2**attempt)  # 指数退避
 
                    # AI回复结束字符串触发静默模式
                    if is_end_str:
                        if silence_service.is_silence_active(from_group):
                            # 如果该群组静默模式已激活,延长时间
                            silence_service.extend_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式时间已刷新: fromUser={from_user}, fromGroup={from_group}"
                            )
                        else:
                            # 如果该群组静默模式未激活,激活静默模式
                            silence_service.activate_silence_mode(from_group)
                            logger.info(
                                f"AI回复结束字符串,群组静默模式已激活: fromUser={from_user}, fromGroup={from_group}"
                            )
 
                if success:
                    # 更新发送状态
                    conversation = (
                        db.query(Conversation)
                        .filter(
                            Conversation.from_user == from_user,
                            Conversation.conversation_id == new_conversation_id,
                            Conversation.group == from_group,
                            Conversation.hour == hour_key,
                        )
                        .first()
                    )
                    if conversation:
                        conversation.is_sent = True
                        conversation.sent_time = current_time
                        db.commit()
 
                    logger.info(f"消息处理完成: from_user={from_user}")
                    return True
                else:
                    logger.error(f"发送AI回答失败: from_user={from_user}")
                    return False
 
        except Exception as e:
            logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")
            return False
 
 
# 全局消息处理器实例
message_processor = MessageProcessor()