yj
2025-09-03 f28ac0166536a2a4b68cac685a41ea667f60f7e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
消息聚合管理接口
"""
 
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional
from loguru import logger
 
from app.services.message_aggregator import message_aggregator
 
 
router = APIRouter()
 
 
class AggregationStatusResponse(BaseModel):
    """聚合状态响应模型"""
 
    success: bool
    message: str
    data: Dict[str, Any]
 
 
class ForceProcessRequest(BaseModel):
    """强制处理请求模型"""
 
    from_user: str
    from_group: str
 
 
class ForceProcessResponse(BaseModel):
    """强制处理响应模型"""
 
    success: bool
    message: str
    processed: bool
 
 
@router.get("/status", response_model=AggregationStatusResponse)
async def get_aggregation_status():
    """
    获取消息聚合状态
 
    Returns:
        聚合状态信息
    """
    try:
        status = message_aggregator.get_aggregation_status()
 
        return AggregationStatusResponse(
            success=True, message="获取聚合状态成功", data=status
        )
 
    except Exception as e:
        logger.error(f"获取聚合状态异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取聚合状态失败: {str(e)}")
 
 
@router.post("/force-process", response_model=ForceProcessResponse)
async def force_process_aggregation(request: ForceProcessRequest):
    """
    强制处理指定用户和群组的聚合消息
 
    Args:
        request: 强制处理请求
 
    Returns:
        处理结果
    """
    try:
        logger.info(
            f"强制处理聚合消息请求: from_user={request.from_user}, from_group={request.from_group}"
        )
 
        processed = message_aggregator.force_process_aggregation(
            request.from_user, request.from_group
        )
 
        if processed:
            return ForceProcessResponse(
                success=True, message="聚合消息处理成功", processed=True
            )
        else:
            return ForceProcessResponse(
                success=True, message="没有找到待聚合的消息", processed=False
            )
 
    except Exception as e:
        logger.error(
            f"强制处理聚合消息异常: from_user={request.from_user}, from_group={request.from_group}, error={str(e)}"
        )
        raise HTTPException(status_code=500, detail=f"强制处理聚合消息失败: {str(e)}")
 
 
@router.get("/config")
async def get_aggregation_config():
    """
    获取消息聚合配置
 
    Returns:
        聚合配置信息
    """
    try:
        config = {
            "enabled": message_aggregator.aggregation_enabled,
            "timeout_seconds": message_aggregator.aggregation_timeout,
        }
 
        return {"success": True, "message": "获取聚合配置成功", "data": config}
 
    except Exception as e:
        logger.error(f"获取聚合配置异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取聚合配置失败: {str(e)}")
 
 
@router.get("/health")
async def aggregation_health_check():
    """
    消息聚合服务健康检查
 
    Returns:
        健康状态
    """
    try:
        status = message_aggregator.get_aggregation_status()
 
        return {
            "success": True,
            "message": "消息聚合服务运行正常",
            "data": {
                "service_status": "healthy",
                "enabled": status["enabled"],
                "active_aggregations": status["active_aggregations"],
            },
        }
 
    except Exception as e:
        logger.error(f"聚合服务健康检查异常: {str(e)}")
        raise HTTPException(status_code=500, detail=f"聚合服务健康检查失败: {str(e)}")