"""
|
消息聚合管理接口
|
"""
|
|
from fastapi import APIRouter, HTTPException
|
from pydantic import BaseModel
|
from typing import Dict, Any, Optional
|
from loguru import logger
|
|
from app.services.message_aggregator import message_aggregator
|
|
|
router = APIRouter()
|
|
|
class AggregationStatusResponse(BaseModel):
|
"""聚合状态响应模型"""
|
|
success: bool
|
message: str
|
data: Dict[str, Any]
|
|
|
class ForceProcessRequest(BaseModel):
|
"""强制处理请求模型"""
|
|
from_user: str
|
from_group: str
|
|
|
class ForceProcessResponse(BaseModel):
|
"""强制处理响应模型"""
|
|
success: bool
|
message: str
|
processed: bool
|
|
|
@router.get("/status", response_model=AggregationStatusResponse)
|
async def get_aggregation_status():
|
"""
|
获取消息聚合状态
|
|
Returns:
|
聚合状态信息
|
"""
|
try:
|
status = message_aggregator.get_aggregation_status()
|
|
return AggregationStatusResponse(
|
success=True, message="获取聚合状态成功", data=status
|
)
|
|
except Exception as e:
|
logger.error(f"获取聚合状态异常: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"获取聚合状态失败: {str(e)}")
|
|
|
@router.post("/force-process", response_model=ForceProcessResponse)
|
async def force_process_aggregation(request: ForceProcessRequest):
|
"""
|
强制处理指定用户和群组的聚合消息
|
|
Args:
|
request: 强制处理请求
|
|
Returns:
|
处理结果
|
"""
|
try:
|
logger.info(
|
f"强制处理聚合消息请求: from_user={request.from_user}, from_group={request.from_group}"
|
)
|
|
processed = message_aggregator.force_process_aggregation(
|
request.from_user, request.from_group
|
)
|
|
if processed:
|
return ForceProcessResponse(
|
success=True, message="聚合消息处理成功", processed=True
|
)
|
else:
|
return ForceProcessResponse(
|
success=True, message="没有找到待聚合的消息", processed=False
|
)
|
|
except Exception as e:
|
logger.error(
|
f"强制处理聚合消息异常: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}"
|
)
|
raise HTTPException(status_code=500, detail=f"强制处理聚合消息失败: {str(e)}")
|
|
|
@router.get("/config")
|
async def get_aggregation_config():
|
"""
|
获取消息聚合配置
|
|
Returns:
|
聚合配置信息
|
"""
|
try:
|
config = {
|
"enabled": message_aggregator.aggregation_enabled,
|
"timeout_seconds": message_aggregator.aggregation_timeout,
|
}
|
|
return {"success": True, "message": "获取聚合配置成功", "data": config}
|
|
except Exception as e:
|
logger.error(f"获取聚合配置异常: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"获取聚合配置失败: {str(e)}")
|
|
|
@router.get("/health")
|
async def aggregation_health_check():
|
"""
|
消息聚合服务健康检查
|
|
Returns:
|
健康状态
|
"""
|
try:
|
status = message_aggregator.get_aggregation_status()
|
|
return {
|
"success": True,
|
"message": "消息聚合服务运行正常",
|
"data": {
|
"service_status": "healthy",
|
"enabled": status["enabled"],
|
"active_aggregations": status["active_aggregations"],
|
},
|
}
|
|
except Exception as e:
|
logger.error(f"聚合服务健康检查异常: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"聚合服务健康检查失败: {str(e)}")
|