#!/usr/bin/env python3 """ 集成测试脚本 用于测试E云管家-DifyAI对接服务的基本功能 """ import requests import time from datetime import datetime def test_service_health(): """测试服务健康状态""" try: response = requests.get("http://localhost:8000/api/v1/health", timeout=5) if response.status_code == 200: print("✅ 服务健康检查通过") return True else: print(f"❌ 服务健康检查失败: {response.status_code}") return False except Exception as e: print(f"❌ 服务连接失败: {str(e)}") return False def test_callback_api(): """测试回调API""" # 模拟E云管家回调数据 callback_data = { "account": "17200000000", "messageType": "80001", "wcId": "wxid_test_user", "data": { "content": "你好,这是一条测试消息", "fromGroup": "test_group@chatroom", "fromUser": "wxid_test_user_123", "memberCount": 5, "msgId": int(time.time()), "newMsgId": int(time.time() * 1000), "self": False, "timestamp": int(time.time()), "toUser": "wxid_bot", "wId": "test-instance-id", }, } try: response = requests.post( "http://localhost:8000/api/v1/callback", json=callback_data, timeout=10 ) if response.status_code == 200: result = response.json() if result.get("success"): print("✅ 回调API测试通过") return True else: print(f"❌ 回调API处理失败: {result.get('message')}") return False else: print(f"❌ 回调API请求失败: {response.status_code}") return False except Exception as e: print(f"❌ 回调API测试异常: {str(e)}") return False def test_invalid_message(): """测试无效消息处理""" # 测试非群聊消息 invalid_data = { "account": "17200000000", "messageType": "80002", # 非群聊消息 "wcId": "wxid_test_user", "data": { "content": "这是一条私聊消息", "fromUser": "wxid_test_user_123", "self": False, }, } try: response = requests.post( "http://localhost:8000/api/v1/callback", json=invalid_data, timeout=10 ) if response.status_code == 200: result = response.json() if not result.get("success"): print("✅ 无效消息过滤测试通过") return True else: print("❌ 无效消息未被正确过滤") return False else: print(f"❌ 无效消息测试请求失败: {response.status_code}") return False except Exception as e: print(f"❌ 无效消息测试异常: {str(e)}") return False def test_self_message(): """测试自己发送的消息过滤""" self_message_data = { "account": "17200000000", "messageType": "80001", "wcId": "wxid_test_user", "data": { "content": "这是我自己发送的消息", "fromGroup": "test_group@chatroom", "fromUser": "wxid_test_user_123", "self": True, # 自己发送的消息 "timestamp": int(time.time()), }, } try: response = requests.post( "http://localhost:8000/api/v1/callback", json=self_message_data, timeout=10 ) if response.status_code == 200: result = response.json() if not result.get("success"): print("✅ 自发消息过滤测试通过") return True else: print("❌ 自发消息未被正确过滤") return False else: print(f"❌ 自发消息测试请求失败: {response.status_code}") return False except Exception as e: print(f"❌ 自发消息测试异常: {str(e)}") return False def main(): """主测试函数""" print("=" * 50) print("E云管家-DifyAI对接服务集成测试") print("=" * 50) print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() tests = [ ("服务健康检查", test_service_health), ("回调API功能", test_callback_api), ("无效消息过滤", test_invalid_message), ("自发消息过滤", test_self_message), ] passed = 0 total = len(tests) for test_name, test_func in tests: print(f"正在测试: {test_name}") if test_func(): passed += 1 print() print("=" * 50) print(f"测试结果: {passed}/{total} 通过") if passed == total: print("🎉 所有测试通过!") return 0 else: print("⚠️ 部分测试失败,请检查服务配置") return 1 if __name__ == "__main__": exit(main())