""" E云管家回调接口 """ import time from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel from typing import Dict, Any, Optional from loguru import logger from app.services.message_processor import message_processor from app.workers.message_worker import message_worker router = APIRouter() class CallbackData(BaseModel): """回调数据模型""" account: Optional[str] = None messageType: str wcId: str data: Dict[str, Any] class CallbackResponse(BaseModel): """回调响应模型""" success: bool message: str code: int = 200 @router.post("/callback", response_model=CallbackResponse) async def receive_callback(callback_data: CallbackData): """ 接收E云管家回调消息 Args: callback_data: 回调数据 Returns: 处理结果 """ try: logger.info( f"收到回调消息: messageType={callback_data.messageType}, wcId={callback_data.wcId}" ) # 将Pydantic模型转换为字典 callback_dict = callback_data.model_dump() # 将消息加入队列 success = message_processor.enqueue_callback_message(callback_dict) logger.info(f"消息入队结果: success={success}") if success: # 获取发送用户ID from_user = callback_dict.get("data", {}).get("fromUser") if from_user: logger.info(f"启动用户队列处理: from_user={from_user}") # 启动用户队列处理 message_worker.process_user_queue(from_user) return CallbackResponse( success=True, message="消息已成功加入处理队列", code=200 ) else: logger.warning("消息处理失败,未能加入队列") return CallbackResponse(success=False, message="消息处理失败", code=400) except Exception as e: logger.error(f"回调处理异常: {str(e)}") raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}") @router.post("/callback/raw") async def receive_raw_callback(request: Request): """ 接收原始回调数据(用于调试) Args: request: 原始请求 Returns: 处理结果 """ try: # 获取原始JSON数据 raw_data = await request.json() logger.info(f"收到原始回调数据: {raw_data}") # 验证必要字段 if "messageType" not in raw_data or "data" not in raw_data: raise HTTPException(status_code=400, detail="缺少必要字段") # 将消息加入队列 success = message_processor.enqueue_callback_message(raw_data) if success: # 获取发送用户ID from_user = raw_data.get("data", {}).get("fromUser") if from_user: # 启动用户队列处理 message_worker.process_user_queue(from_user) return {"success": True, "message": "消息已成功加入处理队列", "code": 200} else: return {"success": False, "message": "消息处理失败", "code": 400} except Exception as e: logger.error(f"原始回调处理异常: {str(e)}") raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}") @router.get("/health") async def health_check(): """健康检查接口""" return { "status": "healthy", "message": "服务运行正常", "timestamp": int(time.time()), }