""" 消息聚合管理接口 """ from fastapi import APIRouter, HTTPException from pydantic import BaseModel from typing import Dict, Any, Optional from loguru import logger from app.services.message_aggregator import message_aggregator router = APIRouter() class AggregationStatusResponse(BaseModel): """聚合状态响应模型""" success: bool message: str data: Dict[str, Any] class ForceProcessRequest(BaseModel): """强制处理请求模型""" from_user: str from_group: str class ForceProcessResponse(BaseModel): """强制处理响应模型""" success: bool message: str processed: bool @router.get("/status", response_model=AggregationStatusResponse) async def get_aggregation_status(): """ 获取消息聚合状态 Returns: 聚合状态信息 """ try: status = message_aggregator.get_aggregation_status() return AggregationStatusResponse( success=True, message="获取聚合状态成功", data=status ) except Exception as e: logger.error(f"获取聚合状态异常: {str(e)}") raise HTTPException(status_code=500, detail=f"获取聚合状态失败: {str(e)}") @router.post("/force-process", response_model=ForceProcessResponse) async def force_process_aggregation(request: ForceProcessRequest): """ 强制处理指定用户和群组的聚合消息 Args: request: 强制处理请求 Returns: 处理结果 """ try: logger.info( f"强制处理聚合消息请求: from_user={request.from_user}, from_group={request.from_group}" ) processed = message_aggregator.force_process_aggregation( request.from_user, request.from_group ) if processed: return ForceProcessResponse( success=True, message="聚合消息处理成功", processed=True ) else: return ForceProcessResponse( success=True, message="没有找到待聚合的消息", processed=False ) except Exception as e: logger.error( f"强制处理聚合消息异常: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}" ) raise HTTPException(status_code=500, detail=f"强制处理聚合消息失败: {str(e)}") @router.get("/config") async def get_aggregation_config(): """ 获取消息聚合配置 Returns: 聚合配置信息 """ try: config = { "enabled": message_aggregator.aggregation_enabled, "timeout_seconds": message_aggregator.aggregation_timeout, } return {"success": True, "message": "获取聚合配置成功", "data": config} except Exception as e: logger.error(f"获取聚合配置异常: {str(e)}") raise HTTPException(status_code=500, detail=f"获取聚合配置失败: {str(e)}") @router.get("/health") async def aggregation_health_check(): """ 消息聚合服务健康检查 Returns: 健康状态 """ try: status = message_aggregator.get_aggregation_status() return { "success": True, "message": "消息聚合服务运行正常", "data": { "service_status": "healthy", "enabled": status["enabled"], "active_aggregations": status["active_aggregations"], }, } except Exception as e: logger.error(f"聚合服务健康检查异常: {str(e)}") raise HTTPException(status_code=500, detail=f"聚合服务健康检查失败: {str(e)}")