yj
2025-07-28 99266ea57913663f9880c512726c42cb7e5e7f28
新增忽略好友消息;删除多余文件
4个文件已添加
9个文件已修改
14个文件已删除
2748 ■■■■ 已修改文件
.env.example 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
CONFIG_GUIDE.md 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
DEPLOYMENT.md 246 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
DEPLOYMENT_WINDOWS.md 186 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Dockerfile 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/friend_ignore.py 241 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/contact_sync.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/dify_client.py 310 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/friend_ignore_service.py 259 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/message_processor.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.example.json 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.production.json 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.py 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
docker-compose.yml 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
install_service.bat 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logs/app.log 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
start.bat 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
start.sh 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
startup.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_integration.py 178 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tests/test_dify_streaming.py 174 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tests/test_friend_ignore_service.py 258 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tests/test_message_processor.py 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
uninstall_service.bat 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.env.example
File was deleted
.gitignore
@@ -1,5 +1,6 @@
# 忽略所有 .log 文件
*.log
logs/*.log
# 忽略特定文件
config.ini
@@ -31,4 +32,5 @@
# 同时忽略所有 .pyc 文件(可选但推荐)
*.pyc
*.pyo
*.pyd
*.pyd
CONFIG_GUIDE.md
File was deleted
DEPLOYMENT.md
File was deleted
DEPLOYMENT_WINDOWS.md
File was deleted
Dockerfile
File was deleted
README.md
@@ -8,7 +8,9 @@
- 过滤和验证消息(仅处理群聊消息,忽略自己发送的消息)
- 使用Redis队列管理用户消息,防止并发处理
- 自动获取和保存联系人信息
- 调用DifyAI接口获取AI回答
- **支持DifyAI流式和阻塞两种模式**
  - 流式模式:实时接收AI回复,响应更快
  - 阻塞模式:等待完整回复后返回,更稳定
- 将AI回答发送回群聊
- 完整的日志记录和错误处理
@@ -154,11 +156,34 @@
## 配置说明
主要配置项在 `config.py` 中:
主要配置项在 `config.json` 中:
- `max_retry_count`: 最大重试次数(默认3次)
- `retry_delay`: 重试延迟(默认5秒)
- `queue_timeout`: 队列超时时间(默认300秒)
### DifyAI配置
- `dify.streaming_enabled`: 是否启用流式模式(默认:false)
- `dify.streaming_timeout`: 流式请求超时时间,单位秒(默认:120)
- `dify.base_url`: DifyAI API地址
- `dify.api_key`: DifyAI API密钥
### 消息处理配置
- `message_processing.max_retry_count`: 最大重试次数(默认3次)
- `message_processing.retry_delay`: 重试延迟(默认5秒)
- `message_processing.queue_timeout`: 队列超时时间(默认300秒)
### 流式模式说明
**启用流式模式的优势:**
- 实时响应:边生成边返回,用户体验更好
- 更快感知:无需等待完整回复即可开始处理
- 连接保活:自动处理ping事件,保持连接稳定
**配置示例:**
```json
{
  "dify": {
    "streaming_enabled": true,
    "streaming_timeout": 180
  }
}
```
## 日志管理
app/api/friend_ignore.py
New file
@@ -0,0 +1,241 @@
"""
好友忽略列表管理API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Set
from loguru import logger
from app.services.friend_ignore_service import friend_ignore_service
from app.services.contact_sync import contact_sync_service
from config import settings
router = APIRouter()
class AddFriendsRequest(BaseModel):
    """添加好友到忽略列表请求模型"""
    friends: List[str]
class RemoveFriendRequest(BaseModel):
    """从忽略列表移除好友请求模型"""
    w_id: str
class IgnoreListResponse(BaseModel):
    """忽略列表响应模型"""
    success: bool
    message: str
    data: Set[str] = None
    count: int = 0
@router.get("/ignore-list", response_model=IgnoreListResponse)
async def get_ignore_list():
    """
    获取当前的好友忽略列表
    Returns:
        忽略列表响应
    """
    try:
        ignore_list = friend_ignore_service.get_ignore_list()
        count = friend_ignore_service.get_ignore_list_count()
        return IgnoreListResponse(
            success=True,
            message="获取忽略列表成功",
            data=ignore_list,
            count=count
        )
    except Exception as e:
        logger.error(f"获取忽略列表失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取忽略列表失败: {str(e)}")
@router.post("/ignore-list/add", response_model=IgnoreListResponse)
async def add_friends_to_ignore_list(request: AddFriendsRequest):
    """
    添加好友到忽略列表
    Args:
        request: 添加好友请求
    Returns:
        操作结果
    """
    try:
        success = friend_ignore_service.add_friends_to_ignore_list(request.friends)
        if success:
            count = friend_ignore_service.get_ignore_list_count()
            return IgnoreListResponse(
                success=True,
                message=f"成功添加 {len(request.friends)} 个好友到忽略列表",
                count=count
            )
        else:
            raise HTTPException(status_code=400, detail="添加好友到忽略列表失败")
    except Exception as e:
        logger.error(f"添加好友到忽略列表失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"添加好友到忽略列表失败: {str(e)}")
@router.post("/ignore-list/remove", response_model=IgnoreListResponse)
async def remove_friend_from_ignore_list(request: RemoveFriendRequest):
    """
    从忽略列表中移除好友
    Args:
        request: 移除好友请求
    Returns:
        操作结果
    """
    try:
        success = friend_ignore_service.remove_friend_from_ignore_list(request.w_id)
        if success:
            count = friend_ignore_service.get_ignore_list_count()
            return IgnoreListResponse(
                success=True,
                message=f"成功从忽略列表中移除好友: {request.w_id}",
                count=count
            )
        else:
            raise HTTPException(status_code=400, detail="从忽略列表移除好友失败")
    except Exception as e:
        logger.error(f"从忽略列表移除好友失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"从忽略列表移除好友失败: {str(e)}")
@router.delete("/ignore-list", response_model=IgnoreListResponse)
async def clear_ignore_list():
    """
    清空忽略列表
    Returns:
        操作结果
    """
    try:
        success = friend_ignore_service.clear_ignore_list()
        if success:
            return IgnoreListResponse(
                success=True,
                message="成功清空忽略列表",
                count=0
            )
        else:
            raise HTTPException(status_code=400, detail="清空忽略列表失败")
    except Exception as e:
        logger.error(f"清空忽略列表失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"清空忽略列表失败: {str(e)}")
@router.post("/sync-contacts", response_model=IgnoreListResponse)
async def sync_contacts_and_rebuild_ignore_list():
    """
    重新同步联系人并重建忽略列表
    Returns:
        操作结果
    """
    try:
        if not settings.ecloud_w_id:
            raise HTTPException(status_code=400, detail="未配置ecloud_w_id")
        success = contact_sync_service.sync_contacts_on_startup(settings.ecloud_w_id)
        if success:
            count = friend_ignore_service.get_ignore_list_count()
            return IgnoreListResponse(
                success=True,
                message="联系人同步完成,忽略列表已重建",
                count=count
            )
        else:
            raise HTTPException(status_code=400, detail="联系人同步失败")
    except Exception as e:
        logger.error(f"联系人同步失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"联系人同步失败: {str(e)}")
@router.get("/ignore-list/check/{w_id}")
async def check_if_friend_ignored(w_id: str):
    """
    检查指定w_id的详细忽略状态
    Args:
        w_id: 要检查的w_id
    Returns:
        详细的检查结果
    """
    try:
        status_info = friend_ignore_service.get_ignore_status_info(w_id)
        return {
            "success": True,
            "data": status_info,
            "message": f"w_id {w_id} 状态检查完成"
        }
    except Exception as e:
        logger.error(f"检查忽略状态失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"检查忽略状态失败: {str(e)}")
@router.get("/whitelist")
async def get_whitelist():
    """
    获取当前的白名单列表
    Returns:
        白名单列表
    """
    try:
        whitelist = friend_ignore_service.get_whitelist()
        return {
            "success": True,
            "data": whitelist,
            "count": len(whitelist),
            "message": "获取白名单成功"
        }
    except Exception as e:
        logger.error(f"获取白名单失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取白名单失败: {str(e)}")
@router.get("/config")
async def get_ignore_config():
    """
    获取好友忽略功能的配置信息
    Returns:
        配置信息
    """
    try:
        return {
            "success": True,
            "data": {
                "ignore_enabled": settings.friend_ignore_enabled,
                "whitelist": settings.friend_ignore_whitelist,
                "whitelist_count": len(settings.friend_ignore_whitelist),
                "ignore_list_count": friend_ignore_service.get_ignore_list_count()
            },
            "message": "获取配置信息成功"
        }
    except Exception as e:
        logger.error(f"获取配置信息失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取配置信息失败: {str(e)}")
app/services/contact_sync.py
@@ -8,6 +8,7 @@
from app.models.contact import Contact
from app.models.database import get_db
from app.services.ecloud_client import ecloud_client
from app.services.friend_ignore_service import friend_ignore_service
class ContactSyncService:
@@ -48,7 +49,10 @@
            logger.info(f"获取到好友列表: wId={w_id}, count={len(friends)}")
            # 4. 批量获取联系人详细信息
            # 4. 将好友w_id添加到Redis忽略列表
            friend_ignore_service.add_friends_to_ignore_list(friends)
            # 5. 批量获取联系人详细信息
            return self._batch_sync_contacts(w_id, friends)
        except Exception as e:
app/services/dify_client.py
@@ -4,6 +4,7 @@
import requests
import time
import json
from typing import List, Dict, Optional, Any
from loguru import logger
from config import settings
@@ -98,6 +99,315 @@
        )
        return None
    def send_chat_message_stream(
        self,
        query: str,
        user: str,
        conversation_id: Optional[str] = None,
        max_retries: int = None,
    ) -> Optional[Dict[str, Any]]:
        """
        发送对话消息(流式模式)
        Args:
            query: 用户输入/提问内容
            user: 用户标识
            conversation_id: 会话ID(可选)
            max_retries: 最大重试次数
        Returns:
            完整的响应数据字典,失败返回None
        """
        if max_retries is None:
            max_retries = settings.max_retry_count
        url = f"{self.base_url}/chat-messages"
        payload = {
            "query": query,
            "response_mode": "streaming",  # 使用流式模式
            "user": user,
            "inputs": {},
        }
        # 如果有会话ID,添加到请求中
        if conversation_id:
            payload["conversation_id"] = conversation_id
        retry_count = 0
        while retry_count <= max_retries:
            try:
                logger.info(
                    f"发送Dify流式消息: user={user}, conversation_id={conversation_id}, retry={retry_count}"
                )
                response = self.session.post(
                    url,
                    json=payload,
                    timeout=settings.dify_streaming_timeout,
                    stream=True
                )
                response.raise_for_status()
                # 处理流式响应
                result = self._process_stream_response(response, user)
                if result:
                    logger.info(
                        f"Dify流式消息发送成功: user={user}, conversation_id={result.get('conversation_id')}"
                    )
                    return result
                else:
                    logger.error(f"Dify流式响应处理失败: user={user}")
            except requests.exceptions.Timeout:
                logger.warning(f"Dify流式请求超时: user={user}, retry={retry_count}")
            except requests.exceptions.RequestException as e:
                logger.error(
                    f"Dify流式网络错误: user={user}, retry={retry_count}, error={str(e)}"
                )
            except Exception as e:
                logger.error(
                    f"Dify流式请求异常: user={user}, retry={retry_count}, error={str(e)}"
                )
            retry_count += 1
            if retry_count <= max_retries:
                wait_time = settings.retry_delay * retry_count
                logger.info(f"等待重试: user={user}, wait_time={wait_time}s")
                time.sleep(wait_time)
        logger.error(
            f"Dify流式消息发送失败,已达最大重试次数: user={user}, max_retries={max_retries}"
        )
        return None
    def _process_stream_response(self, response: requests.Response, user: str) -> Optional[Dict[str, Any]]:
        """
        处理流式响应
        Args:
            response: requests响应对象
            user: 用户标识
        Returns:
            完整的响应数据字典,失败返回None
        """
        try:
            # 检查响应头
            content_type = response.headers.get('content-type', '')
            if 'text/event-stream' not in content_type:
                logger.warning(f"响应不是SSE格式: user={user}, content_type={content_type}")
            complete_answer = ""
            conversation_id = ""
            task_id = ""
            message_id = ""
            created_at = None
            metadata = None
            usage = None
            retriever_resources = None
            message_ended = False  # 标记是否收到message_end事件
            logger.info(f"开始处理流式响应: user={user}")
            # 添加超时和行计数器
            line_count = 0
            max_empty_lines = 50  # 最大连续空行数,防止无限循环
            for line in response.iter_lines(decode_unicode=True):
                line_count += 1
                if not line:
                    # 空行计数,防止无限等待
                    if line_count > max_empty_lines and not complete_answer:
                        logger.warning(f"流式响应过多空行,可能连接异常: user={user}, line_count={line_count}")
                        break
                    continue
                # 跳过非数据行
                if not line.startswith("data: "):
                    continue
                # 提取JSON数据
                data_str = line[6:]  # 移除 "data: " 前缀
                if not data_str.strip():
                    continue
                try:
                    data = json.loads(data_str)
                    event = data.get("event", "")
                    if event == "message":
                        # 消息事件 - 累积答案内容
                        answer_chunk = data.get("answer", "")
                        complete_answer += answer_chunk
                        # 保存基本信息
                        if not conversation_id:
                            conversation_id = data.get("conversation_id", "")
                        if not task_id:
                            task_id = data.get("task_id", "")
                        if not message_id:
                            message_id = data.get("id", "")
                        if created_at is None:
                            created_at = data.get("created_at")
                        logger.debug(f"收到消息块: user={user}, chunk_length={len(answer_chunk)}")
                    elif event == "message_end":
                        # 消息结束事件 - 获取元数据
                        metadata = data.get("metadata")
                        usage = data.get("usage")
                        retriever_resources = data.get("retriever_resources")
                        message_ended = True
                        logger.info(f"流式响应完成: user={user}, total_length={len(complete_answer)}")
                        break
                    elif event == "message_file":
                        # 文件事件 - 记录但继续处理
                        logger.debug(f"收到文件事件: user={user}, file_type={data.get('type')}")
                        continue
                    elif event == "message_replace":
                        # 消息替换事件 - 替换答案内容
                        replace_answer = data.get("answer", "")
                        if replace_answer:
                            complete_answer = replace_answer
                            logger.info(f"消息内容被替换: user={user}, new_length={len(complete_answer)}")
                        continue
                    elif event in ["workflow_started", "node_started", "node_finished", "workflow_finished"]:
                        # 工作流相关事件 - 记录但继续处理
                        logger.debug(f"收到工作流事件: user={user}, event={event}")
                        continue
                    elif event in ["tts_message", "tts_message_end"]:
                        # TTS音频事件 - 记录但继续处理
                        logger.debug(f"收到TTS事件: user={user}, event={event}")
                        continue
                    elif event in ["agent_thought", "agent_message"]:
                        # Agent相关事件 - 需要特殊处理
                        logger.debug(f"收到Agent事件: user={user}, event={event}, data_keys={list(data.keys())}")
                        # 从agent事件中提取conversation_id(如果有的话)
                        if not conversation_id and data.get("conversation_id"):
                            conversation_id = data.get("conversation_id")
                            logger.info(f"从Agent事件获取conversation_id: user={user}, conversation_id={conversation_id}")
                        # 从agent事件中提取基本信息(如果有的话)
                        if not task_id and data.get("task_id"):
                            task_id = data.get("task_id")
                            logger.debug(f"从Agent事件获取task_id: user={user}, task_id={task_id}")
                        if not message_id and data.get("id"):
                            message_id = data.get("id")
                            logger.debug(f"从Agent事件获取message_id: user={user}, message_id={message_id}")
                        if created_at is None and data.get("created_at"):
                            created_at = data.get("created_at")
                            logger.debug(f"从Agent事件获取created_at: user={user}, created_at={created_at}")
                        # 检查agent_message是否包含answer内容
                        if event == "agent_message" and data.get("answer"):
                            agent_answer = data.get("answer", "")
                            complete_answer += agent_answer
                            logger.debug(f"从Agent消息获取内容: user={user}, chunk_length={len(agent_answer)}")
                        continue
                    elif event == "error":
                        # 错误事件
                        error_msg = data.get("message", "未知错误")
                        logger.error(f"流式响应错误: user={user}, error={error_msg}")
                        return None
                    elif event == "ping":
                        # ping事件 - 保持连接
                        logger.debug(f"收到ping事件: user={user}")
                        continue
                    else:
                        # 未知事件类型 - 记录但继续处理
                        logger.debug(f"收到未知事件: user={user}, event={event}")
                        continue
                except json.JSONDecodeError as e:
                    logger.warning(f"解析流式数据JSON失败: user={user}, data={data_str}, error={str(e)}")
                    continue
            # 构建完整响应
            # 对于Agent模式,可能没有conversation_id,但有task_id
            # 在这种情况下,我们可以使用task_id作为conversation_id的替代
            if conversation_id or (task_id and (complete_answer or message_ended)):
                # 如果没有conversation_id但有task_id,使用task_id
                final_conversation_id = conversation_id or task_id
                result = {
                    "event": "message",
                    "task_id": task_id,
                    "id": message_id,
                    "message_id": message_id,
                    "conversation_id": final_conversation_id,
                    "mode": "chat",
                    "answer": complete_answer,  # 可能为空字符串
                    "created_at": created_at
                }
                # 添加可选字段
                if metadata:
                    result["metadata"] = metadata
                if usage:
                    result["usage"] = usage
                if retriever_resources:
                    result["retriever_resources"] = retriever_resources
                if complete_answer:
                    logger.info(f"流式响应处理成功: user={user}, answer_length={len(complete_answer)}, conversation_id={final_conversation_id}, message_ended={message_ended}")
                else:
                    logger.info(f"流式响应处理成功(无内容): user={user}, conversation_id={final_conversation_id}, message_ended={message_ended}")
                return result
            else:
                logger.error(f"流式响应不完整: user={user}, answer={bool(complete_answer)}, conversation_id={bool(conversation_id)}, task_id={bool(task_id)}")
                # 记录更多调试信息
                logger.debug(f"调试信息: task_id={task_id}, message_id={message_id}, created_at={created_at}, message_ended={message_ended}")
                return None
        except Exception as e:
            logger.error(f"处理流式响应异常: user={user}, error={str(e)}")
            return None
    def send_message(
        self,
        query: str,
        user: str,
        conversation_id: Optional[str] = None,
        max_retries: int = None,
        force_streaming: Optional[bool] = None,
    ) -> Optional[Dict[str, Any]]:
        """
        发送对话消息(根据配置选择模式)
        Args:
            query: 用户输入/提问内容
            user: 用户标识
            conversation_id: 会话ID(可选)
            max_retries: 最大重试次数
            force_streaming: 强制使用流式模式(可选,覆盖配置)
        Returns:
            响应数据字典,失败返回None
        """
        # 确定使用哪种模式
        use_streaming = force_streaming if force_streaming is not None else settings.dify_streaming_enabled
        if use_streaming:
            logger.info(f"使用流式模式发送消息: user={user}")
            return self.send_chat_message_stream(query, user, conversation_id, max_retries)
        else:
            logger.info(f"使用阻塞模式发送消息: user={user}")
            return self.send_chat_message(query, user, conversation_id, max_retries)
    def get_conversation_messages(
        self, conversation_id: str, user: str
    ) -> Optional[List[Dict[str, Any]]]:
app/services/friend_ignore_service.py
New file
@@ -0,0 +1,259 @@
"""
好友忽略列表管理服务
"""
from typing import List, Set, Optional
from loguru import logger
from sqlalchemy.orm import Session
from app.services.redis_queue import redis_queue
from app.models.database import get_db
from app.models.contact import Contact
from config import settings
class FriendIgnoreService:
    """好友忽略列表管理服务"""
    def __init__(self):
        self.ignore_list_key = "ecloud_ignore_friends"
    def _get_wid_by_nickname(self, nickname: str) -> Optional[str]:
        """
        根据昵称获取w_id
        Args:
            nickname: 好友昵称
        Returns:
            对应的w_id,如果未找到返回None
        """
        try:
            with next(get_db()) as db:
                contact = db.query(Contact).filter(Contact.nick_name == nickname).first()
                if contact:
                    return contact.wc_id
                else:
                    logger.warning(f"未找到昵称为 '{nickname}' 的联系人")
                    return None
        except Exception as e:
            logger.error(f"根据昵称查找w_id异常: nickname={nickname}, error={str(e)}")
            return None
    def _get_whitelist_wids(self) -> List[str]:
        """
        将配置中的昵称白名单转换为w_id列表
        Returns:
            w_id列表
        """
        wid_list = []
        for nickname in settings.friend_ignore_whitelist:
            wid = self._get_wid_by_nickname(nickname)
            if wid:
                wid_list.append(wid)
                logger.debug(f"白名单昵称 '{nickname}' 对应w_id: {wid}")
            else:
                logger.warning(f"白名单昵称 '{nickname}' 未找到对应的联系人")
        return wid_list
    def add_friends_to_ignore_list(self, friends: List[str]) -> bool:
        """
        将好友w_id添加到Redis忽略列表
        Args:
            friends: 好友w_id列表
        Returns:
            添加成功返回True,失败返回False
        """
        try:
            if not friends:
                logger.info("好友列表为空,无需添加到忽略列表")
                return True
            # 清空现有的忽略列表
            redis_queue.redis_client.delete(self.ignore_list_key)
            # 批量添加好友w_id到忽略列表
            redis_queue.redis_client.sadd(self.ignore_list_key, *friends)
            logger.info(f"已将 {len(friends)} 个好友添加到忽略列表")
            return True
        except Exception as e:
            logger.error(f"添加好友到忽略列表异常: error={str(e)}")
            return False
    def is_friend_ignored(self, w_id: str) -> bool:
        """
        检查指定w_id是否应该被忽略
        逻辑:
        1. 如果好友忽略功能未启用,返回False(不忽略)
        2. 如果w_id在白名单中,返回False(不忽略)
        3. 如果w_id在忽略列表中,返回True(忽略)
        4. 如果w_id不在忽略列表中,返回False(不忽略)
        Args:
            w_id: 用户w_id
        Returns:
            如果应该被忽略返回True,否则返回False
        """
        try:
            # 检查好友忽略功能是否启用
            if not settings.friend_ignore_enabled:
                logger.debug(f"好友忽略功能已禁用,不忽略消息: w_id={w_id}")
                return False
            # 检查是否在白名单中(通过昵称)
            whitelist_wids = self._get_whitelist_wids()
            if w_id in whitelist_wids:
                logger.info(f"w_id在白名单中,不忽略消息: w_id={w_id}")
                return False
            # 检查是否在忽略列表中
            is_in_ignore_list = redis_queue.redis_client.sismember(self.ignore_list_key, w_id)
            if is_in_ignore_list:
                logger.info(f"w_id在忽略列表中,忽略消息: w_id={w_id}")
            return is_in_ignore_list
        except Exception as e:
            logger.error(f"检查忽略列表异常: w_id={w_id}, error={str(e)}")
            return False
    def get_ignore_list(self) -> Set[str]:
        """
        获取完整的忽略列表
        Returns:
            忽略列表中的所有w_id集合
        """
        try:
            return redis_queue.redis_client.smembers(self.ignore_list_key)
        except Exception as e:
            logger.error(f"获取忽略列表异常: error={str(e)}")
            return set()
    def remove_friend_from_ignore_list(self, w_id: str) -> bool:
        """
        从忽略列表中移除指定w_id
        Args:
            w_id: 用户w_id
        Returns:
            移除成功返回True,失败返回False
        """
        try:
            result = redis_queue.redis_client.srem(self.ignore_list_key, w_id)
            if result:
                logger.info(f"已从忽略列表中移除: w_id={w_id}")
            else:
                logger.info(f"w_id不在忽略列表中: w_id={w_id}")
            return True
        except Exception as e:
            logger.error(f"从忽略列表移除w_id异常: w_id={w_id}, error={str(e)}")
            return False
    def clear_ignore_list(self) -> bool:
        """
        清空忽略列表
        Returns:
            清空成功返回True,失败返回False
        """
        try:
            redis_queue.redis_client.delete(self.ignore_list_key)
            logger.info("已清空忽略列表")
            return True
        except Exception as e:
            logger.error(f"清空忽略列表异常: error={str(e)}")
            return False
    def get_ignore_list_count(self) -> int:
        """
        获取忽略列表中的好友数量
        Returns:
            忽略列表中的好友数量
        """
        try:
            return redis_queue.redis_client.scard(self.ignore_list_key)
        except Exception as e:
            logger.error(f"获取忽略列表数量异常: error={str(e)}")
            return 0
    def get_whitelist(self) -> List[str]:
        """
        获取当前的白名单列表
        Returns:
            白名单中的w_id列表
        """
        return settings.friend_ignore_whitelist.copy()
    def is_whitelist_enabled(self) -> bool:
        """
        检查白名单功能是否启用
        Returns:
            如果启用返回True,否则返回False
        """
        return settings.friend_ignore_enabled
    def get_ignore_status_info(self, w_id: str) -> dict:
        """
        获取指定w_id的详细忽略状态信息
        Args:
            w_id: 用户w_id
        Returns:
            包含详细状态信息的字典
        """
        try:
            # 获取白名单w_id列表
            whitelist_wids = self._get_whitelist_wids()
            info = {
                "w_id": w_id,
                "ignore_enabled": settings.friend_ignore_enabled,
                "in_whitelist": w_id in whitelist_wids,
                "in_ignore_list": False,
                "final_ignored": False,
                "reason": "",
                "whitelist_nicknames": settings.friend_ignore_whitelist
            }
            if not settings.friend_ignore_enabled:
                info["reason"] = "好友忽略功能已禁用"
                return info
            if w_id in whitelist_wids:
                info["reason"] = "在白名单中,不会被忽略"
                return info
            info["in_ignore_list"] = redis_queue.redis_client.sismember(self.ignore_list_key, w_id)
            if info["in_ignore_list"]:
                info["final_ignored"] = True
                info["reason"] = "在忽略列表中,会被忽略"
            else:
                info["reason"] = "不在忽略列表中,不会被忽略"
            return info
        except Exception as e:
            logger.error(f"获取忽略状态信息异常: w_id={w_id}, error={str(e)}")
            return {
                "w_id": w_id,
                "error": str(e),
                "final_ignored": False
            }
# 全局好友忽略服务实例
friend_ignore_service = FriendIgnoreService()
app/services/message_processor.py
@@ -16,6 +16,7 @@
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
from app.services.friend_ignore_service import friend_ignore_service
from config import settings
@@ -99,6 +100,12 @@
            or not data.get("content")
        ):
            logger.warning(f"消息缺少必要字段: data={data}")
            return False
        # 检查发送者是否在好友忽略列表中
        from_user = data.get("fromUser")
        if friend_ignore_service.is_friend_ignored(from_user):
            logger.info(f"忽略好友发送的消息: fromUser={from_user}")
            return False
        return True
@@ -210,8 +217,8 @@
                # 3.2 获取用户在当前群组的conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
                # 调用Dify接口发送消息
                dify_response = dify_client.send_chat_message(
                # 调用Dify接口发送消息(根据配置选择模式)
                dify_response = dify_client.send_message(
                    query=content, user=from_user, conversation_id=conversation_id
                )
config.example.json
File was deleted
config.production.json
File was deleted
config.py
@@ -16,91 +16,62 @@
    def _load_config(self):
        """从JSON文件加载配置"""
        if not os.path.exists(self.config_file):
            raise FileNotFoundError(f"配置文件 {self.config_file} 不存在")
        try:
            if os.path.exists(self.config_file):
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    config_data = json.load(f)
                self._set_config_from_dict(config_data)
            else:
                # 如果配置文件不存在,使用默认值
                self._set_default_config()
            with open(self.config_file, 'r', encoding='utf-8') as f:
                config_data = json.load(f)
            self._set_config_from_dict(config_data)
        except Exception as e:
            print(f"加载配置文件失败: {e}")
            self._set_default_config()
            raise Exception(f"加载配置文件失败: {e}")
    def _set_config_from_dict(self, config_data: dict):
        """从字典设置配置"""
        # 数据库配置
        self.database_url = config_data.get("database", {}).get("url", "mysql+pymysql://root:TAI%402019%23Zjun@120.24.39.179:3306/ecloud_dify")
        self.database_url = config_data["database"]["url"]
        # Redis配置
        self.redis_url = config_data.get("redis", {}).get("url", "redis://localhost:6379/0")
        self.redis_url = config_data["redis"]["url"]
        # E云管家配置
        ecloud_config = config_data.get("ecloud", {})
        self.ecloud_base_url = ecloud_config.get("base_url", "http://125.122.152.142:9899")
        self.ecloud_authorization = ecloud_config.get("authorization", "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYxMTQ1MjE3NSIsInBhc3N3b3JkIjoiJDJhJDEwJEU3Ry5LOEJzekphM2JGQlh0SG8vOXVrUk1NalVweGVVemguUDRnMkJBdHN2YXpBb0JIQWJpIn0.Gd2vbeJjL5pUGFhUngWPLkDTLhD3GUaEPXOkdoTf4KRh9o2FtST1OZJxmZuGdUy7WIYlIPVueoVyIu5iHOyi8A")
        self.ecloud_w_id = ecloud_config.get("w_id", "")
        ecloud_config = config_data["ecloud"]
        self.ecloud_base_url = ecloud_config["base_url"]
        self.ecloud_authorization = ecloud_config["authorization"]
        self.ecloud_w_id = ecloud_config["w_id"]
        # DifyAI配置
        dify_config = config_data.get("dify", {})
        self.dify_base_url = dify_config.get("base_url", "https://api.dify.ai/v1")
        self.dify_api_key = dify_config.get("api_key", "app-OMnBr7zsf5UTV83Ey8QcSErA")
        dify_config = config_data["dify"]
        self.dify_base_url = dify_config["base_url"]
        self.dify_api_key = dify_config["api_key"]
        self.dify_streaming_enabled = dify_config["streaming_enabled"]
        self.dify_streaming_timeout = dify_config["streaming_timeout"]
        # 服务配置
        server_config = config_data.get("server", {})
        self.server_host = server_config.get("host", "0.0.0.0")
        self.server_port = server_config.get("port", 7979)
        self.debug = server_config.get("debug", True)
        server_config = config_data["server"]
        self.server_host = server_config["host"]
        self.server_port = server_config["port"]
        self.debug = server_config["debug"]
        # 日志配置
        logging_config = config_data.get("logging", {})
        self.log_level = logging_config.get("level", "INFO")
        self.log_file = logging_config.get("file", "logs/app.log")
        logging_config = config_data["logging"]
        self.log_level = logging_config["level"]
        self.log_file = logging_config["file"]
        # 消息处理配置
        msg_config = config_data.get("message_processing", {})
        self.max_retry_count = msg_config.get("max_retry_count", 3)
        self.retry_delay = msg_config.get("retry_delay", 5)
        self.queue_timeout = msg_config.get("queue_timeout", 300)
        msg_config = config_data["message_processing"]
        self.max_retry_count = msg_config["max_retry_count"]
        self.retry_delay = msg_config["retry_delay"]
        self.queue_timeout = msg_config["queue_timeout"]
        # 客服配置
        customer_service_config = config_data.get("customer_service", {})
        self.customer_service_names = customer_service_config.get("names", ["客服1", "客服2"])
        customer_service_config = config_data["customer_service"]
        self.customer_service_names = customer_service_config["names"]
    def _set_default_config(self):
        """设置默认配置"""
        # 数据库配置
        self.database_url = "mysql+pymysql://root:TAI%402019%23Zjun@120.24.39.179:3306/ecloud_dify"
        # Redis配置
        self.redis_url = "redis://localhost:6379/0"
        # E云管家配置
        self.ecloud_base_url = "http://125.122.152.142:9899"
        self.ecloud_authorization = "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYxMTQ1MjE3NSIsInBhc3N3b3JkIjoiJDJhJDEwJEU3Ry5LOEJzekphM2JGQlh0SG8vOXVrUk1NalVweGVVemguUDRnMkJBdHN2YXpBb0JIQWJpIn0.Gd2vbeJjL5pUGFhUngWPLkDTLhD3GUaEPXOkdoTf4KRh9o2FtST1OZJxmZuGdUy7WIYlIPVueoVyIu5iHOyi8A"
        self.ecloud_w_id = ""
        # DifyAI配置
        self.dify_base_url = "https://api.dify.ai/v1"
        self.dify_api_key = "app-OMnBr7zsf5UTV83Ey8QcSErA"
        # 服务配置
        self.server_host = "0.0.0.0"
        self.server_port = 7979
        self.debug = True
        # 日志配置
        self.log_level = "INFO"
        self.log_file = "logs/app.log"
        # 消息处理配置
        self.max_retry_count = 3
        self.retry_delay = 5
        self.queue_timeout = 300
        # 客服配置
        self.customer_service_names = ["客服1", "客服2"]
        # 好友忽略配置
        friend_ignore_config = config_data["friend_ignore"]
        self.friend_ignore_enabled = friend_ignore_config["enabled"]
        self.friend_ignore_whitelist = friend_ignore_config["whitelist"]
# 全局配置实例
docker-compose.yml
File was deleted
install_service.bat
File was deleted
logs/app.log
@@ -1,9 +1,4 @@
2025-07-23 14:46:37 | INFO | __main__:<module>:105 - 启动E云管家-DifyAI对接服务
2025-07-23 15:03:24 | INFO | __main__:<module>:105 - 启动E云管家-DifyAI对接服务
2025-07-23 16:48:21 | INFO | __main__:<module>:121 - 启动E云管家-DifyAI对接服务
2025-07-23 16:50:14 | INFO | __main__:<module>:121 - 启动E云管家-DifyAI对接服务
2025-07-23 16:55:09 | INFO | __main__:<module>:121 - 启动E云管家-DifyAI对接服务
2025-07-23 16:56:49 | INFO | __main__:<module>:121 - 启动E云管家-DifyAI对接服务
2025-07-23 17:19:58 | INFO | __main__:<module>:121 - 启动E云管家-DifyAI对接服务
2025-07-23 17:43:16 | INFO | __main__:<module>:105 - 启动E云管家-DifyAI对接服务
2025-07-23 17:44:16 | INFO | __main__:<module>:105 - 启动E云管家-DifyAI对接服务
2025-07-28 09:54:56 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 10:41:14 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 10:42:46 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
2025-07-28 11:10:52 | INFO | __main__:<module>:122 - 启动E云管家-DifyAI对接服务
main.py
@@ -10,8 +10,10 @@
import time
from config import settings
from app.api.callback import router as callback_router
from app.api.friend_ignore import router as friend_ignore_router
from app.models.database import create_tables
from app.workers.message_worker import message_worker
from app.services.contact_sync import contact_sync_service
@asynccontextmanager
@@ -33,6 +35,20 @@
        logger.info("消息工作进程启动成功")
    except Exception as e:
        logger.error(f"消息工作进程启动失败: {str(e)}")
    # 同步联系人信息并建立好友忽略列表
    try:
        if settings.ecloud_w_id:
            logger.info("开始同步联系人信息...")
            success = contact_sync_service.sync_contacts_on_startup(settings.ecloud_w_id)
            if success:
                logger.info("联系人同步完成,好友忽略列表已建立")
            else:
                logger.warning("联系人同步失败")
        else:
            logger.warning("未配置ecloud_w_id,跳过联系人同步")
    except Exception as e:
        logger.error(f"联系人同步异常: {str(e)}")
    logger.info("应用启动完成")
@@ -70,6 +86,7 @@
# 注册路由
app.include_router(callback_router, prefix="/api/v1", tags=["回调接口"])
app.include_router(friend_ignore_router, prefix="/api/v1", tags=["好友忽略管理"])
@app.get("/")
start.bat
File was deleted
start.sh
File was deleted
startup.py
File was deleted
test_integration.py
File was deleted
tests/test_dify_streaming.py
New file
@@ -0,0 +1,174 @@
"""
测试Dify流式模式功能
"""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
from app.services.dify_client import DifyClient
from config import settings
class TestDifyStreaming:
    """测试Dify流式模式"""
    def setup_method(self):
        """测试前设置"""
        self.client = DifyClient()
    def test_process_stream_response_success(self):
        """测试成功处理流式响应"""
        # 模拟流式响应数据
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\", \"metadata\": {\"usage\": {\"total_tokens\": 10}}, \"usage\": {\"total_tokens\": 10}}",
        ]
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
        # 验证结果
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
        assert result["task_id"] == "test-task"
        assert result["usage"]["total_tokens"] == 10
    def test_process_stream_response_error(self):
        """测试处理流式响应错误"""
        # 模拟错误响应数据
        stream_data = [
            "data: {\"event\": \"error\", \"message\": \"API调用失败\", \"code\": \"500\"}",
        ]
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
        # 验证结果
        assert result is None
    def test_process_stream_response_incomplete(self):
        """测试处理不完整的流式响应"""
        # 模拟不完整响应数据(缺少message_end事件)
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
        ]
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
        # 验证结果 - 即使没有message_end事件,只要有内容和conversation_id也应该返回结果
        assert result is not None
        assert result["answer"] == "Hello"
        assert result["conversation_id"] == "test-conv"
    @patch('app.services.dify_client.settings')
    def test_send_message_uses_streaming_when_enabled(self, mock_settings):
        """测试当启用流式模式时使用流式发送"""
        # 设置配置为启用流式模式
        mock_settings.dify_streaming_enabled = True
        # 模拟流式发送方法
        with patch.object(self.client, 'send_chat_message_stream') as mock_stream:
            mock_stream.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            result = self.client.send_message("test query", "test_user")
            # 验证调用了流式方法
            mock_stream.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
    @patch('app.services.dify_client.settings')
    def test_send_message_uses_blocking_when_disabled(self, mock_settings):
        """测试当禁用流式模式时使用阻塞发送"""
        # 设置配置为禁用流式模式
        mock_settings.dify_streaming_enabled = False
        # 模拟阻塞发送方法
        with patch.object(self.client, 'send_chat_message') as mock_blocking:
            mock_blocking.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            result = self.client.send_message("test query", "test_user")
            # 验证调用了阻塞方法
            mock_blocking.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
    @patch('app.services.dify_client.settings')
    def test_send_message_force_streaming_override(self, mock_settings):
        """测试强制流式模式覆盖配置"""
        # 设置配置为禁用流式模式
        mock_settings.dify_streaming_enabled = False
        # 模拟流式发送方法
        with patch.object(self.client, 'send_chat_message_stream') as mock_stream:
            mock_stream.return_value = {"answer": "test response", "conversation_id": "test-conv"}
            # 强制使用流式模式
            result = self.client.send_message("test query", "test_user", force_streaming=True)
            # 验证调用了流式方法(覆盖了配置)
            mock_stream.assert_called_once_with("test query", "test_user", None, None)
            assert result["answer"] == "test response"
    def test_process_stream_response_with_ping_events(self):
        """测试处理包含ping事件的流式响应"""
        # 模拟包含ping事件的响应数据
        stream_data = [
            "data: {\"event\": \"ping\"}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: {\"event\": \"ping\"}",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\", \"metadata\": {\"usage\": {\"total_tokens\": 10}}}",
        ]
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
        # 验证结果(ping事件应该被忽略)
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
    def test_process_stream_response_with_invalid_json(self):
        """测试处理包含无效JSON的流式响应"""
        # 模拟包含无效JSON的响应数据
        stream_data = [
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \"Hello\", \"created_at\": 1705398420}",
            "data: invalid json data",
            "data: {\"event\": \"message\", \"task_id\": \"test-task\", \"id\": \"test-msg\", \"conversation_id\": \"test-conv\", \"answer\": \" World\", \"created_at\": 1705398420}",
            "data: {\"event\": \"message_end\"}",
        ]
        # 创建模拟响应对象
        mock_response = Mock()
        mock_response.iter_lines.return_value = stream_data
        # 测试处理流式响应
        result = self.client._process_stream_response(mock_response, "test_user")
        # 验证结果(无效JSON应该被跳过)
        assert result is not None
        assert result["answer"] == "Hello World"
        assert result["conversation_id"] == "test-conv"
if __name__ == "__main__":
    pytest.main([__file__])
tests/test_friend_ignore_service.py
New file
@@ -0,0 +1,258 @@
"""
好友忽略服务测试
"""
import pytest
from unittest.mock import Mock, patch
from app.services.friend_ignore_service import FriendIgnoreService
class TestFriendIgnoreService:
    """好友忽略服务测试类"""
    def setup_method(self):
        """测试前准备"""
        self.service = FriendIgnoreService()
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_add_friends_to_ignore_list_success(self, mock_redis_queue):
        """测试成功添加好友到忽略列表"""
        # 模拟Redis操作
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        friends = ["wxid_test1", "wxid_test2", "wxid_test3"]
        result = self.service.add_friends_to_ignore_list(friends)
        # 验证结果
        assert result is True
        # 验证Redis操作被调用
        mock_redis_client.delete.assert_called_once_with(self.service.ignore_list_key)
        mock_redis_client.sadd.assert_called_once_with(self.service.ignore_list_key, *friends)
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_add_friends_to_ignore_list_empty(self, mock_redis_queue):
        """测试添加空好友列表"""
        result = self.service.add_friends_to_ignore_list([])
        # 验证结果
        assert result is True
        # 验证Redis操作未被调用
        mock_redis_queue.redis_client.delete.assert_not_called()
        mock_redis_queue.redis_client.sadd.assert_not_called()
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_is_friend_ignored_true(self, mock_redis_queue):
        """测试检查好友在忽略列表中"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.return_value = True
        result = self.service.is_friend_ignored("wxid_test1")
        assert result is True
        mock_redis_client.sismember.assert_called_once_with(self.service.ignore_list_key, "wxid_test1")
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_is_friend_ignored_false(self, mock_redis_queue):
        """测试检查好友不在忽略列表中"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.return_value = False
        result = self.service.is_friend_ignored("wxid_test1")
        assert result is False
        mock_redis_client.sismember.assert_called_once_with(self.service.ignore_list_key, "wxid_test1")
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_get_ignore_list(self, mock_redis_queue):
        """测试获取忽略列表"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        expected_set = {"wxid_test1", "wxid_test2"}
        mock_redis_client.smembers.return_value = expected_set
        result = self.service.get_ignore_list()
        assert result == expected_set
        mock_redis_client.smembers.assert_called_once_with(self.service.ignore_list_key)
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_remove_friend_from_ignore_list(self, mock_redis_queue):
        """测试从忽略列表移除好友"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.srem.return_value = 1  # 表示成功移除
        result = self.service.remove_friend_from_ignore_list("wxid_test1")
        assert result is True
        mock_redis_client.srem.assert_called_once_with(self.service.ignore_list_key, "wxid_test1")
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_clear_ignore_list(self, mock_redis_queue):
        """测试清空忽略列表"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        result = self.service.clear_ignore_list()
        assert result is True
        mock_redis_client.delete.assert_called_once_with(self.service.ignore_list_key)
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_get_ignore_list_count(self, mock_redis_queue):
        """测试获取忽略列表数量"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.scard.return_value = 5
        result = self.service.get_ignore_list_count()
        assert result == 5
        mock_redis_client.scard.assert_called_once_with(self.service.ignore_list_key)
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_add_friends_exception_handling(self, mock_redis_queue):
        """测试添加好友时的异常处理"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sadd.side_effect = Exception("Redis error")
        result = self.service.add_friends_to_ignore_list(["wxid_test1"])
        assert result is False
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_is_friend_ignored_exception_handling(self, mock_redis_queue):
        """测试检查好友时的异常处理"""
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.side_effect = Exception("Redis error")
        result = self.service.is_friend_ignored("wxid_test1")
        assert result is False
    @patch('app.services.friend_ignore_service.settings')
    @patch('app.services.friend_ignore_service.redis_queue')
    @patch('app.services.friend_ignore_service.get_db')
    def test_is_friend_ignored_whitelist(self, mock_get_db, mock_redis_queue, mock_settings):
        """测试白名单功能"""
        # 模拟配置
        mock_settings.friend_ignore_enabled = True
        mock_settings.friend_ignore_whitelist = ["测试用户1", "测试用户2"]
        # 模拟数据库查询
        mock_db = Mock()
        mock_get_db.return_value.__next__.return_value.__enter__.return_value = mock_db
        mock_get_db.return_value.__next__.return_value.__exit__.return_value = None
        # 模拟联系人查询结果 - 根据昵称返回不同的联系人
        def mock_query_side_effect(*args, **kwargs):
            mock_query = Mock()
            mock_filter = Mock()
            mock_query.filter.return_value = mock_filter
            # 根据查询条件返回不同的结果
            def mock_first():
                # 这里简化处理,假设查询"测试用户1"时返回wxid_whitelist1
                mock_contact = Mock()
                mock_contact.wc_id = "wxid_whitelist1"
                return mock_contact
            mock_filter.first = mock_first
            return mock_query
        mock_db.query.side_effect = mock_query_side_effect
        # 模拟Redis操作
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.return_value = True  # 在忽略列表中
        # 测试白名单用户(即使在忽略列表中也不应该被忽略)
        result = self.service.is_friend_ignored("wxid_whitelist1")
        assert result is False
        # 测试非白名单用户(在忽略列表中应该被忽略)
        result = self.service.is_friend_ignored("wxid_normal_user")
        assert result is True
    @patch('app.services.friend_ignore_service.settings')
    def test_is_friend_ignored_disabled(self, mock_settings):
        """测试功能禁用时的行为"""
        mock_settings.friend_ignore_enabled = False
        mock_settings.friend_ignore_whitelist = []
        result = self.service.is_friend_ignored("wxid_test1")
        assert result is False
    @patch('app.services.friend_ignore_service.settings')
    @patch('app.services.friend_ignore_service.redis_queue')
    @patch('app.services.friend_ignore_service.get_db')
    def test_get_ignore_status_info(self, mock_get_db, mock_redis_queue, mock_settings):
        """测试获取详细状态信息"""
        mock_settings.friend_ignore_enabled = True
        mock_settings.friend_ignore_whitelist = ["测试用户1"]
        # 模拟数据库查询
        mock_db = Mock()
        mock_get_db.return_value.__enter__.return_value = mock_db
        mock_get_db.return_value.__exit__.return_value = None
        # 模拟联系人查询结果
        mock_contact = Mock()
        mock_contact.wc_id = "wxid_whitelist1"
        mock_db.query.return_value.filter.return_value.first.return_value = mock_contact
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.return_value = True
        # 测试白名单用户
        result = self.service.get_ignore_status_info("wxid_whitelist1")
        assert result["w_id"] == "wxid_whitelist1"
        assert result["in_whitelist"] is True
        assert result["final_ignored"] is False
        assert "白名单" in result["reason"]
        assert result["whitelist_nicknames"] == ["测试用户1"]
        # 测试普通用户
        result = self.service.get_ignore_status_info("wxid_normal_user")
        assert result["w_id"] == "wxid_normal_user"
        assert result["in_whitelist"] is False
        assert result["in_ignore_list"] is True
        assert result["final_ignored"] is True
    @patch('app.services.friend_ignore_service.FriendIgnoreService._get_whitelist_wids')
    @patch('app.services.friend_ignore_service.settings')
    @patch('app.services.friend_ignore_service.redis_queue')
    def test_is_friend_ignored_nickname_whitelist(self, mock_redis_queue, mock_settings, mock_get_whitelist_wids):
        """测试昵称白名单功能"""
        # 模拟配置
        mock_settings.friend_ignore_enabled = True
        mock_settings.friend_ignore_whitelist = ["测试用户1", "测试用户2"]
        # 模拟白名单w_id转换结果
        mock_get_whitelist_wids.return_value = ["wxid_whitelist1", "wxid_whitelist2"]
        # 模拟Redis操作
        mock_redis_client = Mock()
        mock_redis_queue.redis_client = mock_redis_client
        mock_redis_client.sismember.return_value = True  # 在忽略列表中
        # 测试白名单用户(即使在忽略列表中也不应该被忽略)
        result = self.service.is_friend_ignored("wxid_whitelist1")
        assert result is False
        # 测试非白名单用户(在忽略列表中应该被忽略)
        result = self.service.is_friend_ignored("wxid_normal_user")
        assert result is True
        # 验证方法被调用
        assert mock_get_whitelist_wids.call_count >= 2
tests/test_message_processor.py
@@ -71,6 +71,44 @@
        
        result = self.processor.is_valid_group_message(callback_data)
        assert result is False
    @patch('app.services.message_processor.friend_ignore_service')
    def test_is_valid_group_message_friend_ignored(self, mock_friend_ignore_service):
        """测试好友在忽略列表中的消息"""
        mock_friend_ignore_service.is_friend_ignored.return_value = True
        callback_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        result = self.processor.is_valid_group_message(callback_data)
        assert result is False
        mock_friend_ignore_service.is_friend_ignored.assert_called_once_with("wxid_test123")
    @patch('app.services.message_processor.friend_ignore_service')
    def test_is_valid_group_message_friend_not_ignored(self, mock_friend_ignore_service):
        """测试好友不在忽略列表中的消息"""
        mock_friend_ignore_service.is_friend_ignored.return_value = False
        callback_data = {
            "messageType": "80001",
            "data": {
                "fromUser": "wxid_test123",
                "fromGroup": "group123@chatroom",
                "content": "测试消息",
                "self": False
            }
        }
        result = self.processor.is_valid_group_message(callback_data)
        assert result is True
        mock_friend_ignore_service.is_friend_ignored.assert_called_once_with("wxid_test123")
    
    @patch('app.services.message_processor.redis_queue')
    def test_enqueue_callback_message_success(self, mock_redis_queue):
uninstall_service.bat
File was deleted