yj
2025-07-23 1225b6cbf0a028b765a0ab6d784bcb80459a67bb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""
E云管家回调接口
"""
 
import time
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from typing import Dict, Any, Optional
from loguru import logger
 
from app.services.message_processor import message_processor
from app.workers.message_worker import message_worker
 
 
router = APIRouter()
 
 
class CallbackData(BaseModel):
    """回调数据模型"""
 
    account: Optional[str] = None
    messageType: str
    wcId: str
    data: Dict[str, Any]
 
 
class CallbackResponse(BaseModel):
    """回调响应模型"""
 
    success: bool
    message: str
    code: int = 200
 
 
@router.post("/callback", response_model=CallbackResponse)
async def receive_callback(callback_data: CallbackData):
    """
    接收E云管家回调消息
 
    Args:
        callback_data: 回调数据
 
    Returns:
        处理结果
    """
    try:
        logger.info(
            f"收到回调消息: messageType={callback_data.messageType}, wcId={callback_data.wcId}"
        )
 
        # 将Pydantic模型转换为字典
        callback_dict = callback_data.model_dump()
 
        # 将消息加入队列
        success = message_processor.enqueue_callback_message(callback_dict)
 
        logger.info(f"消息入队结果: success={success}")
 
        if success:
            # 获取发送用户ID
            from_user = callback_dict.get("data", {}).get("fromUser")
            if from_user:
                logger.info(f"启动用户队列处理: from_user={from_user}")
                # 启动用户队列处理
                message_worker.process_user_queue(from_user)
 
            return CallbackResponse(
                success=True, message="消息已成功加入处理队列", code=200
            )
        else:
            logger.warning("消息处理失败,未能加入队列")
            return CallbackResponse(success=False, message="消息处理失败", code=400)
 
    except Exception as e:
        logger.error(f"回调处理异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
 
 
@router.post("/callback/raw")
async def receive_raw_callback(request: Request):
    """
    接收原始回调数据(用于调试)
 
    Args:
        request: 原始请求
 
    Returns:
        处理结果
    """
    try:
        # 获取原始JSON数据
        raw_data = await request.json()
 
        logger.info(f"收到原始回调数据: {raw_data}")
 
        # 验证必要字段
        if "messageType" not in raw_data or "data" not in raw_data:
            raise HTTPException(status_code=400, detail="缺少必要字段")
 
        # 将消息加入队列
        success = message_processor.enqueue_callback_message(raw_data)
 
        if success:
            # 获取发送用户ID
            from_user = raw_data.get("data", {}).get("fromUser")
            if from_user:
                # 启动用户队列处理
                message_worker.process_user_queue(from_user)
 
            return {"success": True, "message": "消息已成功加入处理队列", "code": 200}
        else:
            return {"success": False, "message": "消息处理失败", "code": 400}
 
    except Exception as e:
        logger.error(f"原始回调处理异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
 
 
@router.get("/health")
async def health_check():
    """健康检查接口"""
    return {
        "status": "healthy",
        "message": "服务运行正常",
        "timestamp": int(time.time()),
    }