yj
2025-07-22 b1462f7eea87f79655cae7b0438e3af7b6e3cfca
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
消息处理核心逻辑
"""
 
import json
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
from loguru import logger
 
from app.models.database import get_db
from app.models.contact import Contact
from app.models.conversation import Conversation
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
 
 
class MessageProcessor:
    """消息处理器"""
 
    def __init__(self):
        pass
 
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
        检查是否是有效的群聊消息
 
        Args:
            callback_data: 回调数据
 
        Returns:
            是否是有效的群聊消息
        """
        # 检查消息类型是否是群聊消息(80001)
        message_type = callback_data.get("messageType")
        if message_type != "80001":
            logger.info(f"忽略非群聊消息: messageType={message_type}")
            return False
 
        # 检查是否是自己发送的消息
        data = callback_data.get("data", {})
        if data.get("self", False):
            logger.info(f"忽略自己发送的消息: fromUser={data.get('fromUser')}")
            return False
 
        # 检查必要字段
        if (
            not data.get("fromUser")
            or not data.get("fromGroup")
            or not data.get("content")
        ):
            logger.warning(f"消息缺少必要字段: data={data}")
            return False
 
        return True
 
    def enqueue_callback_message(self, callback_data: Dict[str, Any]) -> bool:
        """
        将回调消息加入队列
 
        Args:
            callback_data: 回调数据
 
        Returns:
            是否成功加入队列
        """
        # 验证消息有效性
        if not self.is_valid_group_message(callback_data):
            return False
 
        data = callback_data.get("data", {})
        from_user = data.get("fromUser")
 
        # 将整个回调内容转成JSON字符串存储到Redis队列
        return redis_queue.enqueue_message(from_user, callback_data)
 
    def ensure_contact_exists(self, from_group: str, w_id: str, db: Session) -> bool:
        """
        确保联系人信息存在于数据库中
 
        Args:
            from_group: 群组ID
            w_id: 登录实例标识
            db: 数据库会话
 
        Returns:
            是否成功确保联系人存在
        """
        try:
            # 检查数据库中是否已存在该联系人
            existing_contact = (
                db.query(Contact).filter(Contact.wc_id == from_group).first()
            )
 
            if existing_contact:
                logger.info(f"联系人已存在: wc_id={from_group}")
                return True
 
            # 调用E云管家API获取联系人信息
            contact_info = ecloud_client.get_contact_info(w_id, from_group)
 
            if not contact_info:
                logger.error(f"无法获取联系人信息: wc_id={from_group}")
                return False
 
            # 保存联系人信息到数据库
            new_contact = Contact(
                wc_id=from_group,
                user_name=contact_info.get("userName"),
                nick_name=contact_info.get("nickName"),
                remark=contact_info.get("remark"),
                signature=contact_info.get("signature"),
                sex=contact_info.get("sex"),
                alias_name=contact_info.get("aliasName"),
                country=contact_info.get("country"),
                big_head=contact_info.get("bigHead"),
                small_head=contact_info.get("smallHead"),
                label_list=contact_info.get("labelList"),
                v1=contact_info.get("v1"),
            )
 
            db.add(new_contact)
            db.commit()
 
            logger.info(
                f"成功保存联系人信息: wc_id={from_group}, nick_name={contact_info.get('nickName')}"
            )
            return True
 
        except Exception as e:
            logger.error(f"确保联系人存在失败: wc_id={from_group}, error={str(e)}")
            db.rollback()
            return False
 
    def process_single_message(self, message_data: Dict[str, Any]) -> bool:
        """
        处理单条消息
 
        Args:
            message_data: 消息数据
 
        Returns:
            是否处理成功
        """
        try:
            data = message_data.get("data", {})
            from_user = data.get("fromUser")
            from_group = data.get("fromGroup")
            content = data.get("content")
            w_id = data.get("wId")
 
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
 
            # 获取数据库会话
            db = next(get_db())
 
            try:
                # 3.1 确保联系人信息存在
                if not self.ensure_contact_exists(from_group, w_id, db):
                    logger.error(f"联系人信息处理失败: from_group={from_group}")
                    return False
 
                # 3.2 获取用户的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user)
 
                # 调用Dify接口发送消息
                dify_response = dify_client.send_chat_message(
                    query=content, user=from_user, conversation_id=conversation_id
                )
 
                if not dify_response:
                    logger.error(f"Dify响应失败: from_user={from_user}")
                    return False
 
                # 获取AI回答和新的conversation_id
                ai_answer = dify_response.get("answer", "")
                new_conversation_id = dify_response.get("conversation_id", "")
 
                # 更新Redis中的conversation_id
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id)
 
                # 3.3 保存对话记录到数据库
                user_conversation_key = f"{from_user}_{new_conversation_id}"
 
                # 检查是否已存在记录
                existing_conversation = (
                    db.query(Conversation)
                    .filter(Conversation.user_conversation_key == user_conversation_key)
                    .first()
                )
 
                if existing_conversation:
                    # 更新现有记录
                    existing_conversation.user_question = content
                    existing_conversation.ai_answer = ai_answer
                    existing_conversation.is_processed = True
                    logger.info(f"更新对话记录: key={user_conversation_key}")
                else:
                    # 创建新记录
                    new_conversation = Conversation(
                        user_conversation_key=user_conversation_key,
                        from_user=from_user,
                        conversation_id=new_conversation_id,
                        user_question=content,
                        ai_answer=ai_answer,
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(f"创建对话记录: key={user_conversation_key}")
 
                db.commit()
 
                # 发送AI回答到群聊
                if ai_answer and ecloud_client.send_group_message(
                    w_id, from_group, ai_answer
                ):
                    # 更新发送状态
                    conversation = (
                        db.query(Conversation)
                        .filter(
                            Conversation.user_conversation_key == user_conversation_key
                        )
                        .first()
                    )
                    if conversation:
                        conversation.is_sent = True
                        db.commit()
 
                    logger.info(f"消息处理完成: from_user={from_user}")
                    return True
                else:
                    logger.error(f"发送AI回答失败: from_user={from_user}")
                    return False
 
            finally:
                db.close()
 
        except Exception as e:
            logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")
            return False
 
 
# 全局消息处理器实例
message_processor = MessageProcessor()