"""
|
静默模式管理API
|
"""
|
|
from fastapi import APIRouter, HTTPException, Query
|
from pydantic import BaseModel
|
from typing import Optional
|
from loguru import logger
|
|
from app.services.silence_service import silence_service
|
from config import settings
|
|
|
router = APIRouter()
|
|
|
class SilenceModeResponse(BaseModel):
|
"""静默模式响应模型"""
|
success: bool
|
message: str
|
data: Optional[dict] = None
|
|
|
class ActivateSilenceRequest(BaseModel):
|
"""激活静默模式请求模型"""
|
group_id: str
|
|
|
@router.get("/silence-mode/status", response_model=SilenceModeResponse)
|
async def get_silence_mode_status(group_id: Optional[str] = None):
|
"""
|
获取静默模式状态
|
|
Args:
|
group_id: 群组ID,如果不提供则返回全局状态概览
|
|
Returns:
|
静默模式状态信息
|
"""
|
try:
|
status = silence_service.get_silence_status(group_id)
|
|
message = "获取静默模式状态成功"
|
if group_id:
|
message = f"获取群组静默模式状态成功: {group_id}"
|
|
return SilenceModeResponse(
|
success=True,
|
message=message,
|
data=status
|
)
|
|
except Exception as e:
|
logger.error(f"获取静默模式状态失败: group_id={group_id}, error={str(e)}")
|
raise HTTPException(status_code=500, detail=f"获取静默模式状态失败: {str(e)}")
|
|
|
@router.post("/silence-mode/activate", response_model=SilenceModeResponse)
|
async def activate_silence_mode(request: ActivateSilenceRequest):
|
"""
|
手动激活指定群组的静默模式
|
|
Args:
|
request: 激活请求,包含群组ID
|
|
Returns:
|
操作结果
|
"""
|
try:
|
if not settings.silence_mode_enabled:
|
raise HTTPException(status_code=400, detail="静默模式功能已禁用")
|
|
success = silence_service.activate_silence_mode(request.group_id)
|
|
if success:
|
status = silence_service.get_silence_status(request.group_id)
|
return SilenceModeResponse(
|
success=True,
|
message=f"群组静默模式已激活: {request.group_id},持续时间: {settings.silence_duration_minutes} 分钟",
|
data=status
|
)
|
else:
|
raise HTTPException(status_code=400, detail=f"激活群组静默模式失败: {request.group_id}")
|
|
except HTTPException:
|
raise
|
except Exception as e:
|
logger.error(f"激活群组静默模式失败: group_id={request.group_id}, error={str(e)}")
|
raise HTTPException(status_code=500, detail=f"激活群组静默模式失败: {str(e)}")
|
|
|
class ExtendSilenceRequest(BaseModel):
|
"""延长静默模式请求模型"""
|
group_id: str
|
|
|
@router.post("/silence-mode/extend", response_model=SilenceModeResponse)
|
async def extend_silence_mode(request: ExtendSilenceRequest):
|
"""
|
延长指定群组的静默模式时间
|
|
Args:
|
request: 延长请求,包含群组ID
|
|
Returns:
|
操作结果
|
"""
|
try:
|
if not settings.silence_mode_enabled:
|
raise HTTPException(status_code=400, detail="静默模式功能已禁用")
|
|
success = silence_service.extend_silence_mode(request.group_id)
|
|
if success:
|
status = silence_service.get_silence_status(request.group_id)
|
return SilenceModeResponse(
|
success=True,
|
message=f"群组静默模式时间已刷新: {request.group_id},持续时间: {settings.silence_duration_minutes} 分钟",
|
data=status
|
)
|
else:
|
raise HTTPException(status_code=400, detail=f"延长群组静默模式失败: {request.group_id}")
|
|
except HTTPException:
|
raise
|
except Exception as e:
|
logger.error(f"延长群组静默模式失败: group_id={request.group_id}, error={str(e)}")
|
raise HTTPException(status_code=500, detail=f"延长群组静默模式失败: {str(e)}")
|
|
|
class DeactivateSilenceRequest(BaseModel):
|
"""停用静默模式请求模型"""
|
group_id: Optional[str] = None # 如果为None则停用所有群组
|
|
|
@router.post("/silence-mode/deactivate", response_model=SilenceModeResponse)
|
async def deactivate_silence_mode(request: DeactivateSilenceRequest):
|
"""
|
手动停用静默模式
|
|
Args:
|
request: 停用请求,group_id为None时停用所有群组
|
|
Returns:
|
操作结果
|
"""
|
try:
|
success = silence_service.deactivate_silence_mode(request.group_id)
|
|
if success:
|
status = silence_service.get_silence_status()
|
message = "所有群组的静默模式已停用" if not request.group_id else f"群组静默模式已停用: {request.group_id}"
|
|
return SilenceModeResponse(
|
success=True,
|
message=message,
|
data=status
|
)
|
else:
|
detail = "停用静默模式失败" if not request.group_id else f"停用群组静默模式失败: {request.group_id}"
|
raise HTTPException(status_code=400, detail=detail)
|
|
except HTTPException:
|
raise
|
except Exception as e:
|
logger.error(f"停用静默模式失败: group_id={request.group_id}, error={str(e)}")
|
raise HTTPException(status_code=500, detail=f"停用静默模式失败: {str(e)}")
|
|
|
@router.get("/silence-mode/config")
|
async def get_silence_mode_config():
|
"""
|
获取静默模式配置信息
|
|
Returns:
|
配置信息
|
"""
|
try:
|
return {
|
"success": True,
|
"data": {
|
"enabled": settings.silence_mode_enabled,
|
"duration_minutes": settings.silence_duration_minutes,
|
"description": "群组静默模式功能:当好友消息被忽略时,自动激活该群组的静默模式,在指定时间内不处理该群组的消息"
|
},
|
"message": "获取静默模式配置信息成功"
|
}
|
|
except Exception as e:
|
logger.error(f"获取静默模式配置信息失败: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"获取静默模式配置信息失败: {str(e)}")
|