"""
|
E云管家回调接口
|
"""
|
|
import time
|
from fastapi import APIRouter, HTTPException, Request
|
from pydantic import BaseModel
|
from typing import Dict, Any, Optional
|
from loguru import logger
|
|
from app.services.message_processor import message_processor
|
from app.workers.message_worker import message_worker
|
|
|
router = APIRouter()
|
|
|
class CallbackData(BaseModel):
|
"""回调数据模型"""
|
|
account: Optional[str] = None
|
messageType: str
|
wcId: str
|
data: Dict[str, Any]
|
|
|
class CallbackResponse(BaseModel):
|
"""回调响应模型"""
|
|
success: bool
|
message: str
|
code: int = 200
|
|
|
@router.post("/callback", response_model=CallbackResponse)
|
async def receive_callback(callback_data: CallbackData):
|
"""
|
接收E云管家回调消息
|
|
Args:
|
callback_data: 回调数据
|
|
Returns:
|
处理结果
|
"""
|
try:
|
logger.info(
|
f"收到回调消息: messageType={callback_data.messageType}, wcId={callback_data.wcId}"
|
)
|
|
# 将Pydantic模型转换为字典
|
callback_dict = callback_data.model_dump()
|
|
# 将消息加入队列
|
success = message_processor.enqueue_callback_message(callback_dict)
|
|
logger.info(f"消息入队结果: success={success}")
|
|
if success:
|
# 获取发送用户ID
|
from_user = callback_dict.get("data", {}).get("fromUser")
|
if from_user:
|
logger.info(f"启动用户队列处理: from_user={from_user}")
|
# 启动用户队列处理
|
message_worker.process_user_queue(from_user)
|
|
return CallbackResponse(
|
success=True, message="消息已成功加入处理队列", code=200
|
)
|
else:
|
logger.warning("消息处理失败,未能加入队列")
|
return CallbackResponse(success=False, message="消息处理失败", code=400)
|
|
except Exception as e:
|
logger.error(f"回调处理异常: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
|
|
|
@router.post("/callback/raw")
|
async def receive_raw_callback(request: Request):
|
"""
|
接收原始回调数据(用于调试)
|
|
Args:
|
request: 原始请求
|
|
Returns:
|
处理结果
|
"""
|
try:
|
# 获取原始JSON数据
|
raw_data = await request.json()
|
|
logger.info(f"收到原始回调数据: {raw_data}")
|
|
# 验证必要字段
|
if "messageType" not in raw_data or "data" not in raw_data:
|
raise HTTPException(status_code=400, detail="缺少必要字段")
|
|
# 将消息加入队列
|
success = message_processor.enqueue_callback_message(raw_data)
|
|
if success:
|
# 获取发送用户ID
|
from_user = raw_data.get("data", {}).get("fromUser")
|
if from_user:
|
# 启动用户队列处理
|
message_worker.process_user_queue(from_user)
|
|
return {"success": True, "message": "消息已成功加入处理队列", "code": 200}
|
else:
|
return {"success": False, "message": "消息处理失败", "code": 400}
|
|
except Exception as e:
|
logger.error(f"原始回调处理异常: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")
|
|
|
@router.get("/health")
|
async def health_check():
|
"""健康检查接口"""
|
return {
|
"status": "healthy",
|
"message": "服务运行正常",
|
"timestamp": int(time.time()),
|
}
|