yj
2025-07-23 1225b6cbf0a028b765a0ab6d784bcb80459a67bb
功能更新
10个文件已添加
14个文件已修改
1个文件已删除
1879 ■■■■■ 已修改文件
.gitignore 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CONFIG_GUIDE.md 103 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
DEPLOYMENT_WINDOWS.md 186 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
E云管家接口文档.txt 176 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/api/callback.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/models/conversation.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/contact_sync.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/ecloud_client.py 234 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/message_processor.py 159 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/services/redis_queue.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/utils/logger.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/workers/message_worker.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.example.json 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.production.json 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.py 114 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ecloud_dify.spec 127 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
init.sql 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
install_service.bat 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logs/app.log 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
requirements.txt 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
start.bat 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
startup.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
uninstall_service.bat 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -3,6 +3,7 @@
# å¿½ç•¥ç‰¹å®šæ–‡ä»¶
config.ini
config.json
secrets.json
# å¿½ç•¥æ•´ä¸ªç›®å½•
@@ -25,4 +26,9 @@
.idea/
# ä¾‹å¤–:不忽略 dist ç›®å½•下的 important.js
!dist/important.js
!dist/important.js
# åŒæ—¶å¿½ç•¥æ‰€æœ‰ .pyc æ–‡ä»¶ï¼ˆå¯é€‰ä½†æŽ¨èï¼‰
*.pyc
*.pyo
*.pyd
CONFIG_GUIDE.md
New file
@@ -0,0 +1,103 @@
# é…ç½®æ–‡ä»¶ä½¿ç”¨æŒ‡å—
## æ¦‚è¿°
项目已从Python配置文件(config.py)转换为JSON配置文件(config.json),这样更适合打包为exe文件。
## é…ç½®æ–‡ä»¶ç»“æž„
### config.json
```json
{
  "database": {
    "url": "mysql+pymysql://root:password@host:port/database"
  },
  "redis": {
    "url": "redis://localhost:6379/0"
  },
  "ecloud": {
    "base_url": "http://125.122.152.142:9899",
    "authorization": "your_authorization_token"
  },
  "dify": {
    "base_url": "https://api.dify.ai/v1",
    "api_key": "your_dify_api_key"
  },
  "server": {
    "host": "0.0.0.0",
    "port": 7979,
    "debug": true
  },
  "logging": {
    "level": "INFO",
    "file": "logs/app.log"
  },
  "message_processing": {
    "max_retry_count": 3,
    "retry_delay": 5,
    "queue_timeout": 300
  }
}
```
## é…ç½®é¡¹è¯´æ˜Ž
### æ•°æ®åº“配置 (database)
- `url`: æ•°æ®åº“连接字符串,支持MySQL
### Redis配置 (redis)
- `url`: Redis连接字符串
### E云管家配置 (ecloud)
- `base_url`: E云管家API基础URL
- `authorization`: E云管家API授权令牌
### DifyAI配置 (dify)
- `base_url`: DifyAI API基础URL
- `api_key`: DifyAI API密钥
### æœåŠ¡å™¨é…ç½® (server)
- `host`: æœåŠ¡å™¨ç›‘å¬åœ°å€
- `port`: æœåŠ¡å™¨ç›‘å¬ç«¯å£
- `debug`: æ˜¯å¦å¯ç”¨è°ƒè¯•模式
### æ—¥å¿—配置 (logging)
- `level`: æ—¥å¿—级别 (DEBUG, INFO, WARNING, ERROR)
- `file`: æ—¥å¿—文件路径
### æ¶ˆæ¯å¤„理配置 (message_processing)
- `max_retry_count`: æœ€å¤§é‡è¯•次数
- `retry_delay`: é‡è¯•延迟时间(秒)
- `queue_timeout`: é˜Ÿåˆ—超时时间(秒)
## ä½¿ç”¨æ–¹æ³•
### 1. ä¿®æ”¹é…ç½®
直接编辑 `config.json` æ–‡ä»¶å³å¯ï¼Œåº”用会自动加载新配置。
### 2. é…ç½®æ–‡ä»¶ä½ç½®
- å¼€å‘环境:项目根目录下的 `config.json`
- ç”Ÿäº§çŽ¯å¢ƒï¼šexe文件同目录下的 `config.json`
### 3. é…ç½®éªŒè¯
如果配置文件不存在或格式错误,系统会使用默认配置并输出错误信息。
## å…¼å®¹æ€§
- åŽŸæœ‰çš„ `from config import settings` å¯¼å…¥æ–¹å¼ä¿æŒä¸å˜
- æ‰€æœ‰é…ç½®å±žæ€§çš„访问方式保持不变(如 `settings.database_url`)
- å‘后兼容,不需要修改现有代码
## æ‰“包为exe的优势
1. **配置外部化**: é…ç½®æ–‡ä»¶ç‹¬ç«‹äºŽexe文件,便于部署时修改
2. **无需重新编译**: ä¿®æ”¹é…ç½®ä¸éœ€è¦é‡æ–°æ‰“包exe
3. **易于维护**: JSON格式直观易读,便于运维人员配置
4. **版本控制友好**: å¯ä»¥ä¸ºä¸åŒçŽ¯å¢ƒå‡†å¤‡ä¸åŒçš„é…ç½®æ–‡ä»¶
## æ³¨æ„äº‹é¡¹
1. ç¡®ä¿ `config.json` æ–‡ä»¶æ ¼å¼æ­£ç¡®ï¼Œå¯ä»¥ä½¿ç”¨JSON验证工具检查
2. æ•æ„Ÿä¿¡æ¯ï¼ˆå¦‚数据库密码、API密钥)应妥善保管
3. ç”Ÿäº§çŽ¯å¢ƒå»ºè®®å°† `debug` è®¾ç½®ä¸º `false`
4. æ—¥å¿—文件路径确保应用有写入权限
DEPLOYMENT_WINDOWS.md
New file
@@ -0,0 +1,186 @@
# E云管家-DifyAI对接服务 Windows部署指南
## æ¦‚è¿°
本文档介绍如何在Windows服务器上部署E云管家-DifyAI对接服务的exe版本。
## ç³»ç»Ÿè¦æ±‚
- Windows Server 2016 æˆ–更高版本
- Windows 10/11 (用于测试)
- è‡³å°‘ 2GB å¯ç”¨å†…å­˜
- è‡³å°‘ 1GB å¯ç”¨ç£ç›˜ç©ºé—´
- ç½‘络连接(用于访问数据库、Redis、E云管家和DifyAI服务)
## éƒ¨ç½²æ­¥éª¤
### 1. å‡†å¤‡éƒ¨ç½²æ–‡ä»¶
确保以下文件存在于部署目录中:
- `ecloud_dify.exe` - ä¸»ç¨‹åºæ–‡ä»¶
- `config.production.json` - ç”Ÿäº§çŽ¯å¢ƒé…ç½®æ¨¡æ¿
- `config.example.json` - é…ç½®ç¤ºä¾‹æ–‡ä»¶
- `install_service.bat` - Windows服务安装脚本
- `uninstall_service.bat` - Windows服务卸载脚本
- `start.bat` - ç›´æŽ¥å¯åŠ¨è„šæœ¬ï¼ˆéžæœåŠ¡æ¨¡å¼ï¼‰
### 2. é…ç½®æœåŠ¡
#### 2.1 åˆ›å»ºé…ç½®æ–‡ä»¶
1. å¤åˆ¶ `config.production.json` ä¸º `config.json`
2. ç¼–辑 `config.json` æ–‡ä»¶ï¼Œä¿®æ”¹ä»¥ä¸‹é…ç½®ï¼š
```json
{
  "database": {
    "url": "mysql+pymysql://用户名:密码@数据库地址:端口/数据库名"
  },
  "redis": {
    "url": "redis://Redis地址:端口/数据库编号"
  },
  "ecloud": {
    "base_url": "E云管家API地址",
    "authorization": "E云管家授权令牌"
  },
  "dify": {
    "base_url": "DifyAI API地址",
    "api_key": "DifyAI API密钥"
  },
  "server": {
    "host": "0.0.0.0",
    "port": 7979,
    "debug": false
  }
}
```
#### 2.2 é…ç½®è¯´æ˜Ž
- **database.url**: MySQL数据库连接字符串
- **redis.url**: Redis服务器连接字符串
- **ecloud**: E云管家相关配置
- **dify**: DifyAI相关配置
- **server**: æœåŠ¡å™¨ç›‘å¬é…ç½®
### 3. å®‰è£…和启动服务
#### æ–¹å¼ä¸€ï¼šWindows服务模式(推荐)
1. **安装服务**:
   - å³é”®ç‚¹å‡» `install_service.bat`
   - é€‰æ‹©"以管理员身份运行"
   - æŒ‰ç…§æç¤ºå®Œæˆå®‰è£…
2. **管理服务**:
   ```cmd
   # å¯åŠ¨æœåŠ¡
   sc start ECloudDifyService
   # åœæ­¢æœåŠ¡
   sc stop ECloudDifyService
   # æŸ¥çœ‹æœåŠ¡çŠ¶æ€
   sc query ECloudDifyService
   # åˆ é™¤æœåŠ¡
   sc delete ECloudDifyService
   ```
3. **卸载服务**:
   - å³é”®ç‚¹å‡» `uninstall_service.bat`
   - é€‰æ‹©"以管理员身份运行"
#### æ–¹å¼äºŒï¼šç›´æŽ¥è¿è¡Œæ¨¡å¼
1. åŒå‡» `start.bat` å¯åŠ¨æœåŠ¡
2. æŒ‰ Ctrl+C åœæ­¢æœåŠ¡
### 4. éªŒè¯éƒ¨ç½²
1. **检查服务状态**:
   - è®¿é—® `http://服务器IP:7979/health`
   - åº”该返回健康检查信息
2. **查看日志**:
   - æ—¥å¿—文件位于 `logs/app.log`
   - æ£€æŸ¥æ˜¯å¦æœ‰é”™è¯¯ä¿¡æ¯
3. **测试API**:
   - è®¿é—® `http://服务器IP:7979/`
   - åº”该返回服务信息
## é˜²ç«å¢™é…ç½®
确保Windows防火墙允许端口7979的入站连接:
```cmd
# æ·»åŠ é˜²ç«å¢™è§„åˆ™
netsh advfirewall firewall add rule name="ECloudDify Service" dir=in action=allow protocol=TCP localport=7979
```
## æ•…障排除
### å¸¸è§é—®é¢˜
1. **服务启动失败**:
   - æ£€æŸ¥é…ç½®æ–‡ä»¶æ ¼å¼æ˜¯å¦æ­£ç¡®
   - ç¡®è®¤æ•°æ®åº“å’ŒRedis连接是否正常
   - æŸ¥çœ‹æ—¥å¿—文件获取详细错误信息
2. **端口被占用**:
   - ä¿®æ”¹é…ç½®æ–‡ä»¶ä¸­çš„端口号
   - æˆ–者停止占用端口的其他程序
3. **权限问题**:
   - ç¡®ä¿ä»¥ç®¡ç†å‘˜èº«ä»½è¿è¡Œå®‰è£…脚本
   - æ£€æŸ¥exe文件的执行权限
4. **网络连接问题**:
   - æ£€æŸ¥é˜²ç«å¢™è®¾ç½®
   - ç¡®è®¤ç½‘络连接正常
### æ—¥å¿—查看
日志文件位置:`logs/app.log`
常用日志级别:
- INFO: ä¸€èˆ¬ä¿¡æ¯
- WARNING: è­¦å‘Šä¿¡æ¯
- ERROR: é”™è¯¯ä¿¡æ¯
## æ€§èƒ½ä¼˜åŒ–
1. **内存优化**:
   - ç›‘控内存使用情况
   - å¿…要时调整系统内存分配
2. **网络优化**:
   - ç¡®ä¿ç½‘络延迟较低
   - ä½¿ç”¨é«˜é€Ÿç½‘络连接
3. **数据库优化**:
   - ä¼˜åŒ–数据库连接池设置
   - å®šæœŸæ¸…理日志数据
## å®‰å…¨å»ºè®®
1. **配置文件安全**:
   - ä¿æŠ¤é…ç½®æ–‡ä»¶ä¸­çš„æ•æ„Ÿä¿¡æ¯
   - è®¾ç½®é€‚当的文件权限
2. **网络安全**:
   - ä½¿ç”¨HTTPS(如果支持)
   - é™åˆ¶è®¿é—®IP范围
3. **定期更新**:
   - å®šæœŸæ›´æ–°æœåŠ¡ç‰ˆæœ¬
   - å…³æ³¨å®‰å…¨è¡¥ä¸
## è”系支持
如遇到问题,请提供以下信息:
- é”™è¯¯æ—¥å¿—内容
- é…ç½®æ–‡ä»¶ï¼ˆéšè—æ•æ„Ÿä¿¡æ¯ï¼‰
- ç³»ç»ŸçŽ¯å¢ƒä¿¡æ¯
- é—®é¢˜å¤çŽ°æ­¥éª¤
EÔÆ¹Ü¼Ò½Ó¿ÚÎĵµ.txt
@@ -16,7 +16,7 @@
参数名    å¿…选    ç±»åž‹    è¯´æ˜Ž
wId    æ˜¯    String    ç™»å½•实例标识
wcId    æ˜¯    String    å¥½å‹å¾®ä¿¡id/群id
wcId    æ˜¯    String    å¥½å‹å¾®ä¿¡id/群id,多个使用英文逗号分隔
请求参数示例
@@ -167,4 +167,176 @@
    "message": "失败",
    "code": "1001",
    "data": null
}
}
# åˆå§‹åŒ–通讯录列表接口
简要描述:
初始化通讯录列表
请求URL:
http://域名地址/initAddressList
请求方式:
POST
请求头Headers:
Content-Type:application/json
Authorization:{Authorization}
参数:
参数名    å¿…选    ç±»åž‹    è¯´æ˜Ž
wId    æ˜¯    String    ç™»å½•实例标识
请求参数示例
{
    "wId": "6a696578-16ea-4edc-ac8b-e609bca39c69"
}
成功返回示例
{
    "message": "成功",
    "code": "1000",
    "data": null
}
错误返回示例
{
    "message": "失败",
    "code": "1001",
    "data": null
}
返回数据:
参数名    ç±»åž‹    è¯´æ˜Ž
code    string    1000成功、1001失败
msg    string    åé¦ˆä¿¡æ¯
data    JSONObject    æ— 
# èŽ·å–é€šè®¯å½•åˆ—è¡¨æŽ¥å£
简要描述:
获取通讯录列表
请求URL:
http://域名地址/getAddressList
请求方式:
POST
请求头Headers:
Content-Type:application/json
Authorization:{Authorization}
参数:
参数名    å¿…选    ç±»åž‹    è¯´æ˜Ž
wId    æ˜¯    String    ç™»å½•实例标识
小提示:
获取通讯录列表之前,必须调用初始化通讯录列表接口。
请求参数示例
{
    "wId": "6a696578-16ea-4edc-ac8b-e609bca39c69"
}
成功返回示例
{
    "code": "1000",
    "message": "获取通讯录成功",
    "data": {
        "chatrooms": [
            ""
        ],
        "friends": [
            ""
        ],
        "ghs": [
            ""
        ],
        "others": [
            ""
        ]
    }
}
错误返回示例
{
    "message": "失败",
    "code": "1001",
    "data": null
}
返回数据:
参数名    ç±»åž‹    è¯´æ˜Ž
code    String    1000成功
1001失败
msg    String    åé¦ˆä¿¡æ¯
data    JSONObject
chatrooms    JSONArray    ç¾¤ç»„列表
friends    JSONArray    å¥½å‹åˆ—表
ghs    JSONArray    å…¬ä¼—号列表
others    JSONArray    å¾®ä¿¡å…¶ä»–相关
# ç¾¤èŠ@接口
请求URL:
http://域名地址/sendText
请求方式:
POST
请求头Headers:
Content-Type:application/json
Authorization:{Authorization}
参数:
参数名    å¿…选    ç±»åž‹    è¯´æ˜Ž
wId    æ˜¯    string    ç™»å½•实例标识
wcId    æ˜¯    string    æŽ¥æ”¶æ–¹ç¾¤id
content    æ˜¯    string    æ–‡æœ¬å†…容消息(@的微信昵称需要自己拼接,必须拼接艾特符号,不然不生效)
at    æ˜¯    string    è‰¾ç‰¹çš„微信id(多个以逗号分开)
返回数据:
参数名    ç±»åž‹    è¯´æ˜Ž
code    string    1000成功,1001失败
msg    string    åé¦ˆä¿¡æ¯
data
data.type    int    ç±»åž‹
data.msgId    long    æ¶ˆæ¯msgId
data.newMsgId    long    æ¶ˆæ¯newMsgId
data.createTime    long    æ¶ˆæ¯å‘送时间戳
data.wcId    string    æ¶ˆæ¯æŽ¥æ”¶æ–¹id
请求参数示例
{
 "wId": "0000016f-8911-484a-0001-db2943fc2786",
 "wcId": "22270365143@chatroom",
 "at": "wxid_lr6j4nononb921,wxid_i6qsbbjenjuj22",
 "content": "@E云Team_Mr Li@你微笑时真美 æµ‹è¯•"
}
成功返回示例
{
    "code": "1000",
    "message": "处理成功",
    "data": {
        "type": 1,
        "msgId": 2562652205,
        "newMsgId": 4482117376572170921,
        "createTime": 1641457769,
        "wcId": "22270365143@chatroom"
    }
}
错误返回示例
{
    "message": "失败",
    "code": "1001",
    "data": null
}
README.md
@@ -143,13 +143,14 @@
### conversations(对话记录表)
- `id`: ä¸»é”®
- `user_conversation_key`: ç”¨æˆ·å¯¹è¯å”¯ä¸€é”®
- `from_user`: å‘送用户微信ID
- `conversation_id`: Dify对话ID
- `user_question`: ç”¨æˆ·æé—®
- `ai_answer`: AI回答
- `group`: ç¾¤ç»„ID
- `hour`: å°æ—¶æ ‡è¯†(YYYYMMDD_HH)
- `content`: å¯¹è¯å†…容(JSON格式)
- `is_processed`: æ˜¯å¦å·²å¤„理
- `is_sent`: æ˜¯å¦å·²å‘送
- å”¯ä¸€æ€§çº¦æŸ: (from_user, conversation_id, group, hour)
## é…ç½®è¯´æ˜Ž
app/api/callback.py
@@ -54,10 +54,13 @@
        # å°†æ¶ˆæ¯åŠ å…¥é˜Ÿåˆ—
        success = message_processor.enqueue_callback_message(callback_dict)
        logger.info(f"消息入队结果: success={success}")
        if success:
            # èŽ·å–å‘é€ç”¨æˆ·ID
            from_user = callback_dict.get("data", {}).get("fromUser")
            if from_user:
                logger.info(f"启动用户队列处理: from_user={from_user}")
                # å¯åŠ¨ç”¨æˆ·é˜Ÿåˆ—å¤„ç†
                message_worker.process_user_queue(from_user)
@@ -65,6 +68,7 @@
                success=True, message="消息已成功加入处理队列", code=200
            )
        else:
            logger.warning("消息处理失败,未能加入队列")
            return CallbackResponse(success=False, message="消息处理失败", code=400)
    except Exception as e:
app/models/conversation.py
@@ -1,38 +1,51 @@
"""
对话记录模型
"""
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean, UniqueConstraint
from sqlalchemy.sql import func
from .database import Base
class Conversation(Base):
    """对话记录表"""
    __tablename__ = "conversations"
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    user_conversation_key = Column(String(200), unique=True, index=True, nullable=False,
                                 comment="用户对话唯一键(fromUser+conversation_id)")
    from_user = Column(String(100), index=True, nullable=False, comment="发送用户微信ID")
    from_user = Column(
        String(100), index=True, nullable=False, comment="发送用户微信ID"
    )
    conversation_id = Column(String(100), nullable=False, comment="Dify对话ID")
    # ç”¨æˆ·é—®é¢˜
    user_question = Column(Text, nullable=False, comment="用户提问内容")
    # AI回答
    ai_answer = Column(Text, nullable=True, comment="AI回答内容")
    group = Column(String(100), nullable=False, comment="群组ID")
    hour = Column(String(20), nullable=False, comment="小时标识(YYYYMMDD_HH)")
    # å¯¹è¯å†…容(JSON格式)
    content = Column(
        Text,
        nullable=True,
        comment='对话内容JSON格式,如[{"user":"你好","ai":"你好有什么能帮助你的"}]',
    )
    # æ¶ˆæ¯çŠ¶æ€
    is_processed = Column(Boolean, default=False, comment="是否已处理")
    is_sent = Column(Boolean, default=False, comment="是否已发送回复")
    # æ—¶é—´æˆ³
    question_time = Column(DateTime, default=func.now(), comment="提问时间")
    answer_time = Column(DateTime, nullable=True, comment="回答时间")
    sent_time = Column(DateTime, nullable=True, comment="发送时间")
    created_at = Column(DateTime, default=func.now(), comment="创建时间")
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间")
    updated_at = Column(
        DateTime, default=func.now(), onupdate=func.now(), comment="更新时间"
    )
    # æ·»åŠ å››å­—æ®µç»„åˆçš„å”¯ä¸€æ€§çº¦æŸ
    __table_args__ = (
        UniqueConstraint('from_user', 'conversation_id', 'group', 'hour',
                         name='uq_conversation_key'),
    )
    def __repr__(self):
        return f"<Conversation(from_user='{self.from_user}', conversation_id='{self.conversation_id}')>"
        return f"<Conversation(from_user='{self.from_user}', conversation_id='{self.conversation_id}', group='{self.group}', hour='{self.hour}')>"
app/services/contact_sync.py
New file
@@ -0,0 +1,167 @@
"""
联系人同步服务
"""
from typing import List, Optional
from loguru import logger
from sqlalchemy.orm import Session
from app.models.contact import Contact
from app.models.database import get_db
from app.services.ecloud_client import ecloud_client
class ContactSyncService:
    """联系人同步服务"""
    def __init__(self):
        pass
    def sync_contacts_on_startup(self, w_id: str) -> bool:
        """
        å¯åŠ¨æ—¶åŒæ­¥è”ç³»äººä¿¡æ¯
        Args:
            w_id: ç™»å½•实例标识
        Returns:
            åŒæ­¥æˆåŠŸè¿”å›žTrue,失败返回False
        """
        try:
            logger.info(f"开始同步联系人信息: wId={w_id}")
            # 1. åˆå§‹åŒ–通讯录列表
            if not ecloud_client.init_address_list(w_id):
                logger.error(f"初始化通讯录列表失败: wId={w_id}")
                return False
            # 2. èŽ·å–é€šè®¯å½•åˆ—è¡¨
            address_data = ecloud_client.get_address_list(w_id)
            if not address_data:
                logger.error(f"获取通讯录列表失败: wId={w_id}")
                return False
            # 3. èŽ·å–å¥½å‹åˆ—è¡¨ä¸­çš„wcid
            friends = address_data.get("friends", [])
            if not friends:
                logger.warning(f"好友列表为空: wId={w_id}")
                return True
            logger.info(f"获取到好友列表: wId={w_id}, count={len(friends)}")
            # 4. æ‰¹é‡èŽ·å–è”ç³»äººè¯¦ç»†ä¿¡æ¯
            return self._batch_sync_contacts(w_id, friends)
        except Exception as e:
            logger.error(f"同步联系人信息异常: wId={w_id}, error={str(e)}")
            return False
    def _batch_sync_contacts(self, w_id: str, wc_ids: List[str]) -> bool:
        """
        æ‰¹é‡åŒæ­¥è”系人信息
        Args:
            w_id: ç™»å½•实例标识
            wc_ids: å¾®ä¿¡ID列表
        Returns:
            åŒæ­¥æˆåŠŸè¿”å›žTrue,失败返回False
        """
        try:
            # å°†wcid列表用逗号拼接
            wc_ids_str = ",".join(wc_ids)
            logger.info(f"开始批量获取联系人信息: wId={w_id}, wc_ids_count={len(wc_ids)}")
            # è°ƒç”¨èŽ·å–è”ç³»äººä¿¡æ¯æŽ¥å£
            contact_info = ecloud_client.get_contact_info(w_id, wc_ids_str)
            if not contact_info:
                logger.error(f"批量获取联系人信息失败: wId={w_id}")
                return False
            # å¦‚果返回的是单个联系人信息,转换为列表
            if isinstance(contact_info, dict):
                contact_list = [contact_info]
            else:
                contact_list = contact_info
            logger.info(f"获取到联系人详细信息: count={len(contact_list)}")
            # ä¿å­˜åˆ°æ•°æ®åº“
            return self._save_contacts_to_db(contact_list)
        except Exception as e:
            logger.error(f"批量同步联系人信息异常: wId={w_id}, error={str(e)}")
            return False
    def _save_contacts_to_db(self, contact_list: List[dict]) -> bool:
        """
        ä¿å­˜è”系人信息到数据库
        Args:
            contact_list: è”系人信息列表
        Returns:
            ä¿å­˜æˆåŠŸè¿”å›žTrue,失败返回False
        """
        try:
            with next(get_db()) as db:
                saved_count = 0
                updated_count = 0
                for contact_data in contact_list:
                    wc_id = contact_data.get("userName") or contact_data.get("wcId")
                    if not wc_id:
                        logger.warning(f"联系人信息缺少wcId: {contact_data}")
                        continue
                    # æ£€æŸ¥æ˜¯å¦å·²å­˜åœ¨
                    existing_contact = db.query(Contact).filter(Contact.wc_id == wc_id).first()
                    if existing_contact:
                        # æ›´æ–°çŽ°æœ‰è”ç³»äººä¿¡æ¯
                        existing_contact.user_name = contact_data.get("userName")
                        existing_contact.nick_name = contact_data.get("nickName")
                        existing_contact.remark = contact_data.get("remark")
                        existing_contact.signature = contact_data.get("signature")
                        existing_contact.sex = contact_data.get("sex")
                        existing_contact.alias_name = contact_data.get("aliasName")
                        existing_contact.country = contact_data.get("country")
                        existing_contact.big_head = contact_data.get("bigHead")
                        existing_contact.small_head = contact_data.get("smallHead")
                        existing_contact.label_list = contact_data.get("labelList")
                        existing_contact.v1 = contact_data.get("v1")
                        updated_count += 1
                        logger.debug(f"更新联系人信息: wc_id={wc_id}, nick_name={contact_data.get('nickName')}")
                    else:
                        # åˆ›å»ºæ–°è”系人记录
                        new_contact = Contact(
                            wc_id=wc_id,
                            user_name=contact_data.get("userName"),
                            nick_name=contact_data.get("nickName"),
                            remark=contact_data.get("remark"),
                            signature=contact_data.get("signature"),
                            sex=contact_data.get("sex"),
                            alias_name=contact_data.get("aliasName"),
                            country=contact_data.get("country"),
                            big_head=contact_data.get("bigHead"),
                            small_head=contact_data.get("smallHead"),
                            label_list=contact_data.get("labelList"),
                            v1=contact_data.get("v1"),
                        )
                        db.add(new_contact)
                        saved_count += 1
                        logger.debug(f"新增联系人信息: wc_id={wc_id}, nick_name={contact_data.get('nickName')}")
                # æäº¤äº‹åŠ¡
                db.commit()
                logger.info(f"联系人信息保存完成: æ–°å¢ž={saved_count}, æ›´æ–°={updated_count}")
                return True
        except Exception as e:
            logger.error(f"保存联系人信息到数据库异常: error={str(e)}")
            if 'db' in locals():
                db.rollback()
            return False
# å…¨å±€è”系人同步服务实例
contact_sync_service = ContactSyncService()
app/services/ecloud_client.py
@@ -3,7 +3,7 @@
"""
import requests
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, List
from loguru import logger
from config import settings
@@ -26,10 +26,12 @@
        Args:
            w_id: ç™»å½•实例标识
            wc_id: å¥½å‹å¾®ä¿¡id/群id
            wc_id: å¥½å‹å¾®ä¿¡id/群id,多个使用英文逗号分隔
        Returns:
            è”系人信息字典,失败返回None
            è”系人信息字典或列表,失败返回None
            - å•个wcId时返回字典
            - å¤šä¸ªwcId时返回列表
        """
        try:
            url = f"{self.base_url}/getContact"
@@ -45,8 +47,14 @@
            if result.get("code") == "1000":
                contact_data = result.get("data", [])
                if contact_data and len(contact_data) > 0:
                    logger.info(f"成功获取联系人信息: wcId={wc_id}")
                    return contact_data[0]  # è¿”回第一个联系人信息
                    # å¦‚果是单个wcId,返回第一个联系人信息
                    if "," not in wc_id:
                        logger.info(f"成功获取联系人信息: wcId={wc_id}")
                        return contact_data[0]
                    else:
                        # å¦‚果是多个wcId,返回完整列表
                        logger.info(f"成功获取批量联系人信息: count={len(contact_data)}")
                        return contact_data
                else:
                    logger.warning(f"联系人信息为空: wcId={wc_id}")
                    return None
@@ -63,7 +71,7 @@
            logger.error(f"获取联系人信息异常: wcId={wc_id}, error={str(e)}")
            return None
    def send_text_message(self, w_id: str, wc_id: str, content: str) -> bool:
    def send_text_message(self, w_id: str, wc_id: str, content: str, max_retries: int = None) -> bool:
        """
        å‘送文本消息
@@ -71,38 +79,55 @@
            w_id: ç™»å½•实例标识
            wc_id: æŽ¥æ”¶äººå¾®ä¿¡id/群id
            content: æ–‡æœ¬å†…容消息
            max_retries: æœ€å¤§é‡è¯•次数
        Returns:
            å‘送成功返回True,失败返回False
        """
        try:
            url = f"{self.base_url}/sendText"
            payload = {"wId": w_id, "wcId": wc_id, "content": content}
        if max_retries is None:
            from config import settings
            max_retries = settings.max_retry_count
        retry_count = 0
        while retry_count <= max_retries:
            try:
                url = f"{self.base_url}/sendText"
                payload = {"wId": w_id, "wcId": wc_id, "content": content}
            logger.info(
                f"发送文本消息: wId={w_id}, wcId={wc_id}, content_length={len(content)}"
            )
            response = self.session.post(url, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json()
            if result.get("code") == "1000":
                logger.info(f"文本消息发送成功: wcId={wc_id}")
                return True
            else:
                logger.error(
                    f"文本消息发送失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}"
                logger.info(
                    f"发送文本消息: wId={w_id}, wcId={wc_id}, content_length={len(content)}, retry={retry_count}"
                )
                return False
        except requests.exceptions.RequestException as e:
            logger.error(f"发送文本消息网络错误: wcId={wc_id}, error={str(e)}")
            return False
        except Exception as e:
            logger.error(f"发送文本消息异常: wcId={wc_id}, error={str(e)}")
            return False
                response = self.session.post(url, json=payload, timeout=30)
                response.raise_for_status()
                result = response.json()
                if result.get("code") == "1000":
                    logger.info(f"文本消息发送成功: wcId={wc_id}")
                    return True
                else:
                    logger.error(
                        f"文本消息发送失败: wcId={wc_id}, code={result.get('code')}, message={result.get('message')}"
                    )
            except requests.exceptions.RequestException as e:
                logger.error(f"发送文本消息网络错误: wcId={wc_id}, retry={retry_count}, error={str(e)}")
            except Exception as e:
                logger.error(f"发送文本消息异常: wcId={wc_id}, retry={retry_count}, error={str(e)}")
            retry_count += 1
            if retry_count <= max_retries:
                from config import settings
                wait_time = settings.retry_delay * retry_count
                logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s")
                import time
                time.sleep(wait_time)
        logger.error(
            f"文本消息发送失败,已达最大重试次数: wcId={wc_id}, max_retries={max_retries}"
        )
        return False
    def send_group_message(self, w_id: str, group_id: str, content: str) -> bool:
        """
@@ -118,6 +143,153 @@
        """
        return self.send_text_message(w_id, group_id, content)
    def init_address_list(self, w_id: str) -> bool:
        """
        åˆå§‹åŒ–通讯录列表
        Args:
            w_id: ç™»å½•实例标识
        Returns:
            åˆå§‹åŒ–成功返回True,失败返回False
        """
        try:
            url = f"{self.base_url}/initAddressList"
            payload = {"wId": w_id}
            logger.info(f"初始化通讯录列表: wId={w_id}")
            response = self.session.post(url, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json()
            if result.get("code") == "1000":
                logger.info(f"初始化通讯录列表成功: wId={w_id}")
                return True
            else:
                logger.error(
                    f"初始化通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}"
                )
                return False
        except requests.exceptions.RequestException as e:
            logger.error(f"初始化通讯录列表网络错误: wId={w_id}, error={str(e)}")
            return False
        except Exception as e:
            logger.error(f"初始化通讯录列表异常: wId={w_id}, error={str(e)}")
            return False
    def get_address_list(self, w_id: str) -> Optional[Dict[str, Any]]:
        """
        èŽ·å–é€šè®¯å½•åˆ—è¡¨
        Args:
            w_id: ç™»å½•实例标识
        Returns:
            é€šè®¯å½•数据字典,失败返回None
            è¿”回格式: {
                "chatrooms": [...],  # ç¾¤ç»„列表
                "friends": [...],    # å¥½å‹åˆ—表
                "ghs": [...],        # å…¬ä¼—号列表
                "others": [...]      # å…¶ä»–
            }
        """
        try:
            url = f"{self.base_url}/getAddressList"
            payload = {"wId": w_id}
            logger.info(f"获取通讯录列表: wId={w_id}")
            response = self.session.post(url, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json()
            if result.get("code") == "1000":
                address_data = result.get("data", {})
                logger.info(f"成功获取通讯录列表: wId={w_id}, friends_count={len(address_data.get('friends', []))}")
                return address_data
            else:
                logger.error(
                    f"获取通讯录列表失败: wId={w_id}, code={result.get('code')}, message={result.get('message')}"
                )
                return None
        except requests.exceptions.RequestException as e:
            logger.error(f"获取通讯录列表网络错误: wId={w_id}, error={str(e)}")
            return None
        except Exception as e:
            logger.error(f"获取通讯录列表异常: wId={w_id}, error={str(e)}")
            return None
    def send_group_at_message(self, w_id: str, wc_id: str, content: str, at_wc_ids: List[str], max_retries: int = None) -> bool:
        """
        å‘送群聊@消息
        Args:
            w_id: ç™»å½•实例标识
            wc_id: æŽ¥æ”¶æ–¹ç¾¤id
            content: æ–‡æœ¬å†…容消息(@的微信昵称需要自己拼接,必须拼接艾特符号,不然不生效)
            at_wc_ids: è‰¾ç‰¹çš„微信id列表
            max_retries: æœ€å¤§é‡è¯•次数
        Returns:
            å‘送成功返回True,失败返回False
        """
        if max_retries is None:
            from config import settings
            max_retries = settings.max_retry_count
        retry_count = 0
        while retry_count <= max_retries:
            try:
                url = f"{self.base_url}/sendText"
                # å°†at_wc_ids列表用逗号拼接
                at_str = ",".join(at_wc_ids)
                payload = {
                    "wId": w_id,
                    "wcId": wc_id,
                    "content": content,
                    "at": at_str
                }
                logger.info(
                    f"发送群聊@消息: wId={w_id}, wcId={wc_id}, at={at_str}, content_length={len(content)}, retry={retry_count}"
                )
                response = self.session.post(url, json=payload, timeout=30)
                response.raise_for_status()
                result = response.json()
                if result.get("code") == "1000":
                    logger.info(f"群聊@消息发送成功: wcId={wc_id}, at={at_str}")
                    return True
                else:
                    logger.error(
                        f"群聊@消息发送失败: wcId={wc_id}, at={at_str}, code={result.get('code')}, message={result.get('message')}"
                    )
            except requests.exceptions.RequestException as e:
                logger.error(f"发送群聊@消息网络错误: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}")
            except Exception as e:
                logger.error(f"发送群聊@消息异常: wcId={wc_id}, at={at_str}, retry={retry_count}, error={str(e)}")
            retry_count += 1
            if retry_count <= max_retries:
                from config import settings
                wait_time = settings.retry_delay * retry_count
                logger.info(f"等待重试: wcId={wc_id}, wait_time={wait_time}s")
                import time
                time.sleep(wait_time)
        logger.error(
            f"群聊@消息发送失败,已达最大重试次数: wcId={wc_id}, at={at_str}, max_retries={max_retries}"
        )
        return False
# å…¨å±€E云管家客户端实例
ecloud_client = ECloudClient()
app/services/message_processor.py
@@ -3,7 +3,10 @@
"""
import json
from typing import Dict, Any, Optional
import time
import re
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from sqlalchemy.orm import Session
from loguru import logger
@@ -13,6 +16,7 @@
from app.services.redis_queue import redis_queue
from app.services.ecloud_client import ecloud_client
from app.services.dify_client import dify_client
from config import settings
class MessageProcessor:
@@ -20,6 +24,51 @@
    def __init__(self):
        pass
    def parse_at_mentions(self, ai_answer: str) -> Tuple[str, List[str]]:
        """
        è§£æžAI回复中的@字符,提取客服名称
        Args:
            ai_answer: AI回复内容
        Returns:
            (处理后的消息内容, éœ€è¦@的客服wcid列表)
        """
        try:
            # èŽ·å–é…ç½®çš„å®¢æœåç§°åˆ—è¡¨
            customer_service_names = settings.customer_service_names
            # æŸ¥æ‰¾æ‰€æœ‰@字符后的客服名称
            at_pattern = r'@([^\s]+)'
            matches = re.findall(at_pattern, ai_answer)
            valid_at_names = []
            at_wc_ids = []
            for match in matches:
                # æ£€æŸ¥æ˜¯å¦åœ¨é…ç½®çš„客服名称列表中
                if match in customer_service_names:
                    valid_at_names.append(match)
                    logger.info(f"发现有效的@客服名称: {match}")
            # å¦‚果有有效的@客服名称,查询数据库获取wcid
            if valid_at_names:
                with next(get_db()) as db:
                    for name in valid_at_names:
                        # æ ¹æ®nick_name查找联系人
                        contact = db.query(Contact).filter(Contact.nick_name == name).first()
                        if contact:
                            at_wc_ids.append(contact.wc_id)
                            logger.info(f"找到客服联系人: name={name}, wc_id={contact.wc_id}")
                        else:
                            logger.warning(f"未找到客服联系人: name={name}")
            return ai_answer, at_wc_ids
        except Exception as e:
            logger.error(f"解析@字符异常: error={str(e)}")
            return ai_answer, []
    def is_valid_group_message(self, callback_data: Dict[str, Any]) -> bool:
        """
@@ -151,17 +200,15 @@
            logger.info(f"开始处理消息: from_user={from_user}, from_group={from_group}")
            # èŽ·å–æ•°æ®åº“ä¼šè¯
            db = next(get_db())
            try:
            # ä½¿ç”¨ä¸Šä¸‹æ–‡ç®¡ç†å™¨ç¡®ä¿æ•°æ®åº“会话正确管理
            with next(get_db()) as db:
                # 3.1 ç¡®ä¿è”系人信息存在
                if not self.ensure_contact_exists(from_group, w_id, db):
                    logger.error(f"联系人信息处理失败: from_group={from_group}")
                    return False
                # 3.2 èŽ·å–ç”¨æˆ·çš„conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user)
                # 3.2 èŽ·å–ç”¨æˆ·åœ¨å½“å‰ç¾¤ç»„çš„conversation_id
                conversation_id = redis_queue.get_conversation_id(from_user, from_group)
                # è°ƒç”¨Dify接口发送消息
                dify_response = dify_client.send_chat_message(
@@ -176,55 +223,112 @@
                ai_answer = dify_response.get("answer", "")
                new_conversation_id = dify_response.get("conversation_id", "")
                # æ›´æ–°Redis中的conversation_id
                # æ›´æ–°Redis中的conversation_id(基于用户+群组)
                if new_conversation_id:
                    redis_queue.set_conversation_id(from_user, new_conversation_id)
                    redis_queue.set_conversation_id(from_user, new_conversation_id, from_group)
                # 3.3 ä¿å­˜å¯¹è¯è®°å½•到数据库
                user_conversation_key = f"{from_user}_{new_conversation_id}"
                # æŒ‰ç”¨æˆ·ã€ç¾¤ç»„和小时分组对话记录
                current_time = datetime.now()
                hour_key = current_time.strftime("%Y%m%d_%H")
                # æ£€æŸ¥æ˜¯å¦å·²å­˜åœ¨è®°å½•
                # æŸ¥æ‰¾å½“前用户在当前群组当前小时的对话记录
                existing_conversation = (
                    db.query(Conversation)
                    .filter(Conversation.user_conversation_key == user_conversation_key)
                    .filter(
                        Conversation.from_user == from_user,
                        Conversation.conversation_id == new_conversation_id,
                        Conversation.group == from_group,
                        Conversation.hour == hour_key
                    )
                    .first()
                )
                if existing_conversation:
                    # æ›´æ–°çŽ°æœ‰è®°å½•
                    existing_conversation.user_question = content
                    existing_conversation.ai_answer = ai_answer
                    existing_conversation.is_processed = True
                    logger.info(f"更新对话记录: key={user_conversation_key}")
                    # æ›´æ–°çŽ°æœ‰è®°å½• - ä½¿ç”¨JSON格式追加对话内容(当前用户在当前群组当前小时的对话)
                    try:
                        # è§£æžçŽ°æœ‰çš„content JSON
                        if existing_conversation.content:
                            content_list = json.loads(existing_conversation.content)
                        else:
                            content_list = []
                        # è¿½åŠ æ–°çš„å¯¹è¯å†…å®¹
                        content_list.append({
                            "user": content,
                            "ai": ai_answer
                        })
                        # æ›´æ–°è®°å½•
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.is_processed = True
                        logger.info(f"追加到当前用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, å¯¹è¯è½®æ¬¡={len(content_list)}")
                    except json.JSONDecodeError as e:
                        logger.error(f"解析现有对话内容JSON失败: {str(e)}, é‡æ–°åˆ›å»º")
                        # å¦‚æžœJSON解析失败,重新创建content
                        content_list = [{"user": content, "ai": ai_answer}]
                        existing_conversation.content = json.dumps(content_list, ensure_ascii=False)
                        existing_conversation.is_processed = True
                else:
                    # åˆ›å»ºæ–°è®°å½•
                    # åˆ›å»ºæ–°è®°å½• - æ–°çš„用户群组小时对话或首次对话,使用JSON格式存储对话内容
                    content_list = [{"user": content, "ai": ai_answer}]
                    new_conversation = Conversation(
                        user_conversation_key=user_conversation_key,
                        from_user=from_user,
                        conversation_id=new_conversation_id,
                        user_question=content,
                        ai_answer=ai_answer,
                        group=from_group,
                        hour=hour_key,
                        content=json.dumps(content_list, ensure_ascii=False),
                        is_processed=True,
                    )
                    db.add(new_conversation)
                    logger.info(f"创建对话记录: key={user_conversation_key}")
                    logger.info(f"创建新的用户群组小时对话记录: user={from_user}, group={from_group}, hour={hour_key}, åˆå§‹å¯¹è¯è½®æ¬¡=1")
                db.commit()
                # å‘送AI回答到群聊
                if ai_answer and ecloud_client.send_group_message(
                    w_id, from_group, ai_answer
                ):
                success = False
                if ai_answer:
                    # è§£æžAI回复中的@字符
                    processed_answer, at_wc_ids = self.parse_at_mentions(ai_answer)
                    # å‘送消息,最多重试3次
                    for attempt in range(3):
                        if at_wc_ids:
                            # å¦‚果有@客服,使用群聊@接口
                            logger.info(f"使用群聊@接口发送消息: at_wc_ids={at_wc_ids}")
                            if ecloud_client.send_group_at_message(
                                w_id, from_group, processed_answer, at_wc_ids
                            ):
                                success = True
                                break
                        else:
                            # æ™®é€šç¾¤èŠæ¶ˆæ¯
                            logger.info("使用普通群聊接口发送消息")
                            if ecloud_client.send_group_message(
                                w_id, from_group, processed_answer
                            ):
                                success = True
                                break
                        logger.warning(f"发送AI回答失败,尝试重试 ({attempt + 1}/3): from_user={from_user}")
                        if attempt < 2:  # ä¸æ˜¯æœ€åŽä¸€æ¬¡å°è¯•,等待一段时间再重试
                            time.sleep(2 ** attempt)  # æŒ‡æ•°é€€é¿
                if success:
                    # æ›´æ–°å‘送状态
                    conversation = (
                        db.query(Conversation)
                        .filter(
                            Conversation.user_conversation_key == user_conversation_key
                            Conversation.from_user == from_user,
                            Conversation.conversation_id == new_conversation_id,
                            Conversation.group == from_group,
                            Conversation.hour == hour_key
                        )
                        .first()
                    )
                    if conversation:
                        conversation.is_sent = True
                        conversation.sent_time = current_time
                        db.commit()
                    logger.info(f"消息处理完成: from_user={from_user}")
@@ -232,9 +336,6 @@
                else:
                    logger.error(f"发送AI回答失败: from_user={from_user}")
                    return False
            finally:
                db.close()
        except Exception as e:
            logger.error(f"处理消息异常: from_user={from_user}, error={str(e)}")
app/services/redis_queue.py
@@ -26,9 +26,14 @@
        """获取处理状态键"""
        return f"{self.processing_prefix}{from_user}"
    def get_conversation_key(self, from_user: str) -> str:
    def get_conversation_key(self, from_user: str, group: str = None) -> str:
        """获取用户对话ID键"""
        return f"{self.conversation_prefix}{from_user}"
        if group:
            # åŸºäºŽç”¨æˆ·+群组生成conversation_key,确保不同群组的对话独立
            return f"{self.conversation_prefix}{from_user}:{group}"
        else:
            # å…¼å®¹æ—§ç‰ˆæœ¬ï¼Œä»…基于用户
            return f"{self.conversation_prefix}{from_user}"
    def enqueue_message(self, from_user: str, message_data: Dict[str, Any]) -> bool:
        """将消息加入用户队列"""
@@ -36,8 +41,8 @@
            queue_key = self.get_user_queue_key(from_user)
            message_json = json.dumps(message_data, ensure_ascii=False)
            # ä½¿ç”¨LPUSH将消息加入队列头部
            result = self.redis_client.lpush(queue_key, message_json)
            # ä½¿ç”¨RPUSH将消息加入队列尾部,确保FIFO顺序
            result = self.redis_client.rpush(queue_key, message_json)
            logger.info(f"消息已加入队列: user={from_user}, queue_length={result}, message={message_json}")
            return True
@@ -107,28 +112,28 @@
            logger.error(f"检查处理状态失败: user={from_user}, error={str(e)}")
            return False
    def get_conversation_id(self, from_user: str) -> Optional[str]:
    def get_conversation_id(self, from_user: str, group: str = None) -> Optional[str]:
        """获取用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            conversation_key = self.get_conversation_key(from_user, group)
            return self.redis_client.get(conversation_key)
        except Exception as e:
            logger.error(f"获取对话ID失败: user={from_user}, error={str(e)}")
            logger.error(f"获取对话ID失败: user={from_user}, group={group}, error={str(e)}")
            return None
    def set_conversation_id(
        self, from_user: str, conversation_id: str, ttl: int = 86400
        self, from_user: str, conversation_id: str, group: str = None, ttl: int = 86400
    ) -> bool:
        """设置用户的对话ID"""
        try:
            conversation_key = self.get_conversation_key(from_user)
            conversation_key = self.get_conversation_key(from_user, group)
            self.redis_client.setex(conversation_key, ttl, conversation_id)
            logger.info(
                f"设置对话ID: user={from_user}, conversation_id={conversation_id}"
                f"设置对话ID: user={from_user}, group={group}, conversation_id={conversation_id}"
            )
            return True
        except Exception as e:
            logger.error(f"设置对话ID失败: user={from_user}, error={str(e)}")
            logger.error(f"设置对话ID失败: user={from_user}, group={group}, error={str(e)}")
            return False
    def get_queue_length(self, from_user: str) -> int:
app/utils/logger.py
@@ -28,11 +28,11 @@
        level=settings.log_level,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
        rotation="1 day",
        retention="30 days",
        retention="7 days",
        compression="zip",
        encoding="utf-8"
    )
    # æ·»åŠ é”™è¯¯æ—¥å¿—æ–‡ä»¶
    error_log_file = settings.log_file.replace('.log', '_error.log')
    logger.add(
@@ -40,7 +40,7 @@
        level="ERROR",
        format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
        rotation="1 day",
        retention="30 days",
        retention="7 days",
        compression="zip",
        encoding="utf-8"
    )
app/workers/message_worker.py
@@ -47,10 +47,34 @@
        """监控队列,为有消息的用户启动处理线程"""
        while self.running:
            try:
                # è¿™é‡Œå¯ä»¥å®žçŽ°æ›´å¤æ‚çš„é˜Ÿåˆ—å‘çŽ°æœºåˆ¶
                # ç›®å‰ç®€åŒ–为检查已知的活跃用户
                # åœ¨å®žé™…应用中,可以通过Redis的SCAN命令扫描所有队列
                # ä½¿ç”¨Redis的SCAN命令扫描所有队列键
                cursor = 0
                queue_keys = set()
                while self.running:
                    cursor, keys = self._scan_queue_keys(cursor)
                    queue_keys.update(keys)
                    # å¦‚果游标为0,表示扫描完成
                    if cursor == 0:
                        break
                # ä¸ºæ¯ä¸ªæœ‰æ¶ˆæ¯çš„队列启动处理线程
                for queue_key in queue_keys:
                    if not self.running:
                        break
                    # ä»Žé˜Ÿåˆ—键中提取用户ID
                    if queue_key.startswith(redis_queue.queue_prefix):
                        from_user = queue_key[len(redis_queue.queue_prefix):]
                        # æ£€æŸ¥é˜Ÿåˆ—是否有消息
                        queue_length = redis_queue.get_queue_length(from_user)
                        if queue_length > 0:
                            logger.info(f"发现用户队列有消息: user={from_user}, length={queue_length}")
                            # å¯åŠ¨ç”¨æˆ·é˜Ÿåˆ—å¤„ç†
                            self.process_user_queue(from_user)
                # æ¸…理已完成的线程
                self._cleanup_finished_threads()
@@ -60,6 +84,29 @@
            except Exception as e:
                logger.error(f"队列监控异常: {str(e)}")
                time.sleep(10)
    def _scan_queue_keys(self, cursor: int = 0, count: int = 100) -> tuple:
        """
        æ‰«æé˜Ÿåˆ—é”®
        Args:
            cursor: æ‰«ææ¸¸æ ‡
            count: æ¯æ¬¡æ‰«æçš„键数量
        Returns:
            (新游标, é˜Ÿåˆ—键列表)
        """
        try:
            # ä½¿ç”¨SCAN命令扫描匹配模式的键
            new_cursor, keys = redis_queue.redis_client.scan(
                cursor=cursor,
                match=f"{redis_queue.queue_prefix}*",
                count=count
            )
            return new_cursor, keys
        except Exception as e:
            logger.error(f"扫描队列键失败: {str(e)}")
            return 0, []
    def _cleanup_finished_threads(self):
        """清理已完成的线程"""
@@ -102,6 +149,10 @@
        """
        try:
            logger.info(f"开始处理用户消息队列: {from_user}")
            # èŽ·å–é˜Ÿåˆ—é•¿åº¦
            queue_length = redis_queue.get_queue_length(from_user)
            logger.info(f"用户队列初始长度: {from_user}, length={queue_length}")
            while self.running:
                # æ£€æŸ¥ç”¨æˆ·æ˜¯å¦æ­£åœ¨å¤„理中(防止并发)
@@ -132,6 +183,9 @@
                    if success:
                        logger.info(f"消息处理成功: {from_user}")
                        # æ›´æ–°é˜Ÿåˆ—长度信息
                        queue_length = redis_queue.get_queue_length(from_user)
                        logger.info(f"用户队列剩余长度: {from_user}, length={queue_length}")
                    else:
                        logger.error(f"消息处理失败: {from_user}")
                        # å¯ä»¥è€ƒè™‘将失败的消息重新入队或记录到错误队列
config.example.json
New file
@@ -0,0 +1,34 @@
{
  "database": {
    "url": "mysql+pymysql://username:password@host:port/database_name"
  },
  "redis": {
    "url": "redis://localhost:6379/0"
  },
  "ecloud": {
    "base_url": "http://your-ecloud-server:port",
    "authorization": "your_ecloud_authorization_token",
    "w_id": "your_ecloud_w_id"
  },
  "dify": {
    "base_url": "https://api.dify.ai/v1",
    "api_key": "your_dify_api_key"
  },
  "server": {
    "host": "0.0.0.0",
    "port": 7979,
    "debug": false
  },
  "logging": {
    "level": "INFO",
    "file": "logs/app.log"
  },
  "message_processing": {
    "max_retry_count": 3,
    "retry_delay": 5,
    "queue_timeout": 300
  },
  "customer_service": {
    "names": ["客服1", "客服2"]
  }
}
config.production.json
New file
@@ -0,0 +1,30 @@
{
  "database": {
    "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify"
  },
  "redis": {
    "url": "redis://localhost:6379/0"
  },
  "ecloud": {
    "base_url": "http://125.122.152.142:9899",
    "authorization": "your_ecloud_authorization_token"
  },
  "dify": {
    "base_url": "https://api.dify.ai/v1",
    "api_key": "your_dify_api_key"
  },
  "server": {
    "host": "0.0.0.0",
    "port": 7979,
    "debug": false
  },
  "logging": {
    "level": "INFO",
    "file": "logs/app.log"
  },
  "message_processing": {
    "max_retry_count": 3,
    "retry_delay": 5,
    "queue_timeout": 300
  }
}
config.py
@@ -3,44 +3,104 @@
"""
import os
from typing import Optional
from pydantic_settings import BaseSettings
import json
class Settings(BaseSettings):
class Settings:
    """应用配置"""
    # æ•°æ®åº“配置
    database_url: str = "mysql+pymysql://root:TAI%402019%23Zjun@120.24.39.179:3306/ecloud_dify"
    def __init__(self, config_file: str = "config.json"):
        """初始化配置,从JSON文件加载"""
        self.config_file = config_file
        self._load_config()
    # Redis配置
    redis_url: str = "redis://localhost:6379/0"
    def _load_config(self):
        """从JSON文件加载配置"""
        try:
            if os.path.exists(self.config_file):
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    config_data = json.load(f)
                self._set_config_from_dict(config_data)
            else:
                # å¦‚果配置文件不存在,使用默认值
                self._set_default_config()
        except Exception as e:
            print(f"加载配置文件失败: {e}")
            self._set_default_config()
    # E云管家配置
    ecloud_base_url: str = "http://your-ecloud-domain.com"
    ecloud_authorization: str = "your-authorization-token"
    def _set_config_from_dict(self, config_data: dict):
        """从字典设置配置"""
        # æ•°æ®åº“配置
        self.database_url = config_data.get("database", {}).get("url", "mysql+pymysql://root:TAI%402019%23Zjun@120.24.39.179:3306/ecloud_dify")
    # DifyAI配置
    dify_base_url: str = "https://api.dify.ai/v1"
    dify_api_key: str = "your-dify-api-key"
        # Redis配置
        self.redis_url = config_data.get("redis", {}).get("url", "redis://localhost:6379/0")
    # æœåŠ¡é…ç½®
    server_host: str = "0.0.0.0"
    server_port: int = 8000
    debug: bool = True
        # E云管家配置
        ecloud_config = config_data.get("ecloud", {})
        self.ecloud_base_url = ecloud_config.get("base_url", "http://125.122.152.142:9899")
        self.ecloud_authorization = ecloud_config.get("authorization", "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYxMTQ1MjE3NSIsInBhc3N3b3JkIjoiJDJhJDEwJEU3Ry5LOEJzekphM2JGQlh0SG8vOXVrUk1NalVweGVVemguUDRnMkJBdHN2YXpBb0JIQWJpIn0.Gd2vbeJjL5pUGFhUngWPLkDTLhD3GUaEPXOkdoTf4KRh9o2FtST1OZJxmZuGdUy7WIYlIPVueoVyIu5iHOyi8A")
        self.ecloud_w_id = ecloud_config.get("w_id", "")
    # æ—¥å¿—配置
    log_level: str = "INFO"
    log_file: str = "logs/app.log"
        # DifyAI配置
        dify_config = config_data.get("dify", {})
        self.dify_base_url = dify_config.get("base_url", "https://api.dify.ai/v1")
        self.dify_api_key = dify_config.get("api_key", "app-OMnBr7zsf5UTV83Ey8QcSErA")
    # æ¶ˆæ¯å¤„理配置
    max_retry_count: int = 3
    retry_delay: int = 5  # ç§’
    queue_timeout: int = 300  # ç§’
        # æœåŠ¡é…ç½®
        server_config = config_data.get("server", {})
        self.server_host = server_config.get("host", "0.0.0.0")
        self.server_port = server_config.get("port", 7979)
        self.debug = server_config.get("debug", True)
    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"
        # æ—¥å¿—配置
        logging_config = config_data.get("logging", {})
        self.log_level = logging_config.get("level", "INFO")
        self.log_file = logging_config.get("file", "logs/app.log")
        # æ¶ˆæ¯å¤„理配置
        msg_config = config_data.get("message_processing", {})
        self.max_retry_count = msg_config.get("max_retry_count", 3)
        self.retry_delay = msg_config.get("retry_delay", 5)
        self.queue_timeout = msg_config.get("queue_timeout", 300)
        # å®¢æœé…ç½®
        customer_service_config = config_data.get("customer_service", {})
        self.customer_service_names = customer_service_config.get("names", ["客服1", "客服2"])
    def _set_default_config(self):
        """设置默认配置"""
        # æ•°æ®åº“配置
        self.database_url = "mysql+pymysql://root:TAI%402019%23Zjun@120.24.39.179:3306/ecloud_dify"
        # Redis配置
        self.redis_url = "redis://localhost:6379/0"
        # E云管家配置
        self.ecloud_base_url = "http://125.122.152.142:9899"
        self.ecloud_authorization = "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYxMTQ1MjE3NSIsInBhc3N3b3JkIjoiJDJhJDEwJEU3Ry5LOEJzekphM2JGQlh0SG8vOXVrUk1NalVweGVVemguUDRnMkJBdHN2YXpBb0JIQWJpIn0.Gd2vbeJjL5pUGFhUngWPLkDTLhD3GUaEPXOkdoTf4KRh9o2FtST1OZJxmZuGdUy7WIYlIPVueoVyIu5iHOyi8A"
        self.ecloud_w_id = ""
        # DifyAI配置
        self.dify_base_url = "https://api.dify.ai/v1"
        self.dify_api_key = "app-OMnBr7zsf5UTV83Ey8QcSErA"
        # æœåŠ¡é…ç½®
        self.server_host = "0.0.0.0"
        self.server_port = 7979
        self.debug = True
        # æ—¥å¿—配置
        self.log_level = "INFO"
        self.log_file = "logs/app.log"
        # æ¶ˆæ¯å¤„理配置
        self.max_retry_count = 3
        self.retry_delay = 5
        self.queue_timeout = 300
        # å®¢æœé…ç½®
        self.customer_service_names = ["客服1", "客服2"]
# å…¨å±€é…ç½®å®žä¾‹
ecloud_dify.spec
New file
@@ -0,0 +1,127 @@
# -*- mode: python ; coding: utf-8 -*-
import os
import sys
from PyInstaller.utils.hooks import collect_data_files, collect_submodules
# èŽ·å–é¡¹ç›®æ ¹ç›®å½•
project_root = os.path.abspath('.')
# æ”¶é›†æ‰€æœ‰æ•°æ®æ–‡ä»¶
datas = []
# æ·»åŠ é…ç½®æ–‡ä»¶
datas.append(('config.example.json', '.'))
datas.append(('config.production.json', '.'))
if os.path.exists('config.json'):
    datas.append(('config.json', '.'))
# æ·»åŠ æ—¥å¿—ç›®å½•
if os.path.exists('logs'):
    datas.append(('logs', 'logs'))
# æ”¶é›†FastAPI和其他包的数据文件
datas.extend(collect_data_files('fastapi'))
datas.extend(collect_data_files('uvicorn'))
datas.extend(collect_data_files('pydantic'))
datas.extend(collect_data_files('sqlalchemy'))
# æ”¶é›†éšè—å¯¼å…¥
hiddenimports = []
hiddenimports.extend(collect_submodules('uvicorn'))
hiddenimports.extend(collect_submodules('fastapi'))
hiddenimports.extend(collect_submodules('pydantic'))
hiddenimports.extend(collect_submodules('sqlalchemy'))
hiddenimports.extend(collect_submodules('pymysql'))
hiddenimports.extend(collect_submodules('redis'))
hiddenimports.extend(collect_submodules('celery'))
hiddenimports.extend(collect_submodules('loguru'))
hiddenimports.extend(collect_submodules('cryptography'))
hiddenimports.extend(collect_submodules('requests'))
hiddenimports.extend(collect_submodules('httpx'))
# æ·»åŠ ç‰¹å®šçš„éšè—å¯¼å…¥
hiddenimports.extend([
    'uvicorn.lifespan.on',
    'uvicorn.lifespan.off',
    'uvicorn.protocols.websockets.auto',
    'uvicorn.protocols.websockets.websockets_impl',
    'uvicorn.protocols.http.auto',
    'uvicorn.protocols.http.h11_impl',
    'uvicorn.protocols.http.httptools_impl',
    'uvicorn.loops.auto',
    'uvicorn.loops.asyncio',
    'uvicorn.logging',
    'pymysql.converters',
    'pymysql.cursors',
    'pymysql.connections',
    'sqlalchemy.dialects.mysql.pymysql',
    'sqlalchemy.pool',
    'sqlalchemy.engine.default',
    'app.api.callback',
    'app.models.database',
    'app.workers.message_worker',
    'app.services.message_processor',
    'app.services.dify_client',
    'app.services.ecloud_client',
    'app.services.redis_queue',
    'app.utils.logger',
])
# æŽ’除不需要的模块
excludes = [
    'tkinter',
    'matplotlib',
    'numpy',
    'pandas',
    'scipy',
    'PIL',
    'IPython',
    'jupyter',
    'notebook',
    'pytest',
    'test',
    'tests',
]
block_cipher = None
a = Analysis(
    ['startup.py'],
    pathex=[project_root],
    binaries=[],
    datas=datas,
    hiddenimports=hiddenimports,
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=excludes,
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    cipher=block_cipher,
    noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.zipfiles,
    a.datas,
    [],
    name='ecloud_dify',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    upx_exclude=[],
    runtime_tmpdir=None,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
    icon=None,
)
init.sql
File was deleted
install_service.bat
New file
@@ -0,0 +1,102 @@
@echo off
chcp 65001 >nul
echo ========================================
echo E云管家-DifyAI对接服务 Windows服务安装脚本
echo ========================================
echo.
:: æ£€æŸ¥ç®¡ç†å‘˜æƒé™
net session >nul 2>&1
if %errorLevel% == 0 (
    echo æ£€æµ‹åˆ°ç®¡ç†å‘˜æƒé™ï¼Œç»§ç»­å®‰è£…...
) else (
    echo é”™è¯¯ï¼šéœ€è¦ç®¡ç†å‘˜æƒé™æ‰èƒ½å®‰è£…Windows服务
    echo è¯·å³é”®ç‚¹å‡»æ­¤è„šæœ¬ï¼Œé€‰æ‹©"以管理员身份运行"
    pause
    exit /b 1
)
:: èŽ·å–å½“å‰ç›®å½•
set "CURRENT_DIR=%~dp0"
set "SERVICE_NAME=ECloudDifyService"
set "SERVICE_DISPLAY_NAME=E云管家-DifyAI对接服务"
set "SERVICE_DESCRIPTION=将E云管家消息转发到DifyAI并返回AI回答的服务"
set "EXE_PATH=%CURRENT_DIR%ecloud_dify.exe"
echo å½“前目录: %CURRENT_DIR%
echo æœåŠ¡åç§°: %SERVICE_NAME%
echo å¯æ‰§è¡Œæ–‡ä»¶: %EXE_PATH%
echo.
:: æ£€æŸ¥exe文件是否存在
if not exist "%EXE_PATH%" (
    echo é”™è¯¯ï¼šæ‰¾ä¸åˆ°å¯æ‰§è¡Œæ–‡ä»¶ %EXE_PATH%
    echo è¯·ç¡®ä¿å·²ç»å®Œæˆé¡¹ç›®æ‰“包,并且ecloud_dify.exe文件存在
    pause
    exit /b 1
)
:: åœæ­¢å¹¶åˆ é™¤å·²å­˜åœ¨çš„æœåŠ¡
echo æ£€æŸ¥æ˜¯å¦å­˜åœ¨åŒåæœåŠ¡...
sc query "%SERVICE_NAME%" >nul 2>&1
if %errorLevel% == 0 (
    echo å‘现已存在的服务,正在停止并删除...
    sc stop "%SERVICE_NAME%" >nul 2>&1
    timeout /t 3 /nobreak >nul
    sc delete "%SERVICE_NAME%" >nul 2>&1
    if %errorLevel% == 0 (
        echo å·²åˆ é™¤æ—§æœåŠ¡
    ) else (
        echo è­¦å‘Šï¼šåˆ é™¤æ—§æœåŠ¡å¤±è´¥ï¼Œç»§ç»­å®‰è£…æ–°æœåŠ¡
    )
    echo.
)
:: åˆ›å»ºWindows服务
echo æ­£åœ¨åˆ›å»ºWindows服务...
sc create "%SERVICE_NAME%" binPath= "\"%EXE_PATH%\"" DisplayName= "%SERVICE_DISPLAY_NAME%" start= auto
if %errorLevel% == 0 (
    echo æœåŠ¡åˆ›å»ºæˆåŠŸ
) else (
    echo é”™è¯¯ï¼šæœåŠ¡åˆ›å»ºå¤±è´¥
    pause
    exit /b 1
)
:: è®¾ç½®æœåŠ¡æè¿°
sc description "%SERVICE_NAME%" "%SERVICE_DESCRIPTION%"
:: è®¾ç½®æœåŠ¡æ¢å¤é€‰é¡¹ï¼ˆå¤±è´¥æ—¶è‡ªåŠ¨é‡å¯ï¼‰
sc failure "%SERVICE_NAME%" reset= 86400 actions= restart/5000/restart/10000/restart/30000
echo.
echo æœåŠ¡å®‰è£…å®Œæˆï¼
echo.
echo å¯ç”¨çš„æ“ä½œï¼š
echo 1. å¯åŠ¨æœåŠ¡: sc start %SERVICE_NAME%
echo 2. åœæ­¢æœåŠ¡: sc stop %SERVICE_NAME%
echo 3. æŸ¥çœ‹æœåŠ¡çŠ¶æ€: sc query %SERVICE_NAME%
echo 4. åˆ é™¤æœåŠ¡: sc delete %SERVICE_NAME%
echo.
echo æˆ–者使用Windows服务管理器 (services.msc) è¿›è¡Œç®¡ç†
echo.
:: è¯¢é—®æ˜¯å¦ç«‹å³å¯åŠ¨æœåŠ¡
set /p START_NOW="是否立即启动服务?(Y/N): "
if /i "%START_NOW%"=="Y" (
    echo æ­£åœ¨å¯åŠ¨æœåŠ¡...
    sc start "%SERVICE_NAME%"
    if %errorLevel% == 0 (
        echo æœåŠ¡å¯åŠ¨æˆåŠŸï¼
        echo æœåŠ¡å°†åœ¨ç«¯å£ 7979 ä¸Šè¿è¡Œ
        echo å¯ä»¥é€šè¿‡ http://localhost:7979 è®¿é—®æœåŠ¡
    ) else (
        echo æœåŠ¡å¯åŠ¨å¤±è´¥ï¼Œè¯·æ£€æŸ¥é…ç½®æ–‡ä»¶å’Œæ—¥å¿—
    )
) else (
    echo æœåŠ¡å·²å®‰è£…ä½†æœªå¯åŠ¨
    echo å¯ä»¥ç¨åŽé€šè¿‡ sc start %SERVICE_NAME% å¯åŠ¨æœåŠ¡
)
echo.
pause
logs/app.log
@@ -1,4 +1,9 @@
2025-07-22 14:23:00 | INFO | __main__:<module>:94 - å¯åЍE云管家-DifyAI对接服务
2025-07-22 14:24:44 | INFO | __main__:<module>:94 - å¯åЍE云管家-DifyAI对接服务
2025-07-22 14:25:57 | INFO | __main__:<module>:94 - å¯åЍE云管家-DifyAI对接服务
2025-07-22 14:37:50 | INFO | __main__:<module>:94 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 14:46:37 | INFO | __main__:<module>:105 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 15:03:24 | INFO | __main__:<module>:105 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 16:48:21 | INFO | __main__:<module>:121 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 16:50:14 | INFO | __main__:<module>:121 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 16:55:09 | INFO | __main__:<module>:121 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 16:56:49 | INFO | __main__:<module>:121 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 17:19:58 | INFO | __main__:<module>:121 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 17:43:16 | INFO | __main__:<module>:105 - å¯åЍE云管家-DifyAI对接服务
2025-07-23 17:44:16 | INFO | __main__:<module>:105 - å¯åЍE云管家-DifyAI对接服务
main.py
@@ -7,6 +7,7 @@
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from loguru import logger
import time
from config import settings
from app.api.callback import router as callback_router
from app.models.database import create_tables
@@ -81,12 +82,22 @@
    }
@app.get("/health")
async def health_check():
    """健康检查接口"""
    return {
        "status": "healthy",
        "message": "E云管家-DifyAI对接服务运行正常",
        "timestamp": int(time.time()),
    }
if __name__ == "__main__":
    # é…ç½®æ—¥å¿—
    logger.add(
        settings.log_file,
        rotation="1 day",
        retention="30 days",
        retention="7 days",
        level=settings.log_level,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
    )
requirements.txt
@@ -11,3 +11,5 @@
cryptography==45.0.5
pytest==8.4.1
httpx==0.28.1
pyinstaller==6.3.0
pywin32==306
start.bat
New file
@@ -0,0 +1,42 @@
@echo off
chcp 65001 >nul
echo ========================================
echo E云管家-DifyAI对接服务 å¯åŠ¨è„šæœ¬
echo ========================================
echo.
:: èŽ·å–å½“å‰ç›®å½•
set "CURRENT_DIR=%~dp0"
set "EXE_PATH=%CURRENT_DIR%ecloud_dify.exe"
echo å½“前目录: %CURRENT_DIR%
echo å¯æ‰§è¡Œæ–‡ä»¶: %EXE_PATH%
echo.
:: æ£€æŸ¥exe文件是否存在
if not exist "%EXE_PATH%" (
    echo é”™è¯¯ï¼šæ‰¾ä¸åˆ°å¯æ‰§è¡Œæ–‡ä»¶ %EXE_PATH%
    echo è¯·ç¡®ä¿å·²ç»å®Œæˆé¡¹ç›®æ‰“包,并且ecloud_dify.exe文件存在
    pause
    exit /b 1
)
:: æ£€æŸ¥é…ç½®æ–‡ä»¶
if not exist "%CURRENT_DIR%config.json" (
    echo è­¦å‘Šï¼šæœªæ‰¾åˆ°config.json配置文件
    echo ç¨‹åºå°†ä½¿ç”¨é»˜è®¤é…ç½®æˆ–config.production.json模板
    echo.
)
echo æ­£åœ¨å¯åЍE云管家-DifyAI对接服务...
echo æœåŠ¡å°†åœ¨ç«¯å£ 7979 ä¸Šè¿è¡Œ
echo å¯ä»¥é€šè¿‡ http://localhost:7979 è®¿é—®æœåŠ¡
echo æŒ‰ Ctrl+C åœæ­¢æœåŠ¡
echo.
:: å¯åŠ¨æœåŠ¡
"%EXE_PATH%"
echo.
echo æœåŠ¡å·²åœæ­¢
pause
startup.py
New file
@@ -0,0 +1,139 @@
"""
启动脚本 - å¤„理配置文件和目录初始化
"""
import os
import sys
import json
import shutil
from pathlib import Path
def get_exe_dir():
    """获取exe文件所在目录"""
    if getattr(sys, 'frozen', False):
        # å¦‚果是打包后的exe文件
        return os.path.dirname(sys.executable)
    else:
        # å¦‚果是开发环境
        return os.path.dirname(os.path.abspath(__file__))
def ensure_directories():
    """确保必要的目录存在"""
    exe_dir = get_exe_dir()
    # åˆ›å»ºæ—¥å¿—目录
    logs_dir = os.path.join(exe_dir, 'logs')
    if not os.path.exists(logs_dir):
        os.makedirs(logs_dir)
        print(f"创建日志目录: {logs_dir}")
    return exe_dir
def ensure_config_file():
    """确保配置文件存在"""
    exe_dir = get_exe_dir()
    config_file = os.path.join(exe_dir, 'config.json')
    if not os.path.exists(config_file):
        # å¦‚æžœconfig.json不存在,尝试复制生产配置模板
        production_config = os.path.join(exe_dir, 'config.production.json')
        example_config = os.path.join(exe_dir, 'config.example.json')
        if os.path.exists(production_config):
            shutil.copy2(production_config, config_file)
            print(f"复制生产配置文件: {production_config} -> {config_file}")
        elif os.path.exists(example_config):
            shutil.copy2(example_config, config_file)
            print(f"复制示例配置文件: {example_config} -> {config_file}")
        else:
            # åˆ›å»ºé»˜è®¤é…ç½®æ–‡ä»¶
            default_config = {
                "database": {
                    "url": "mysql+pymysql://root:password@localhost:3306/ecloud_dify"
                },
                "redis": {
                    "url": "redis://localhost:6379/0"
                },
                "ecloud": {
                    "base_url": "http://125.122.152.142:9899",
                    "authorization": "your_ecloud_authorization_token",
                    "w_id": "your_ecloud_w_id"
                },
                "dify": {
                    "base_url": "https://api.dify.ai/v1",
                    "api_key": "your_dify_api_key"
                },
                "server": {
                    "host": "0.0.0.0",
                    "port": 7979,
                    "debug": False
                },
                "logging": {
                    "level": "INFO",
                    "file": "logs/app.log"
                },
                "message_processing": {
                    "max_retry_count": 3,
                    "retry_delay": 5,
                    "queue_timeout": 300
                },
                "customer_service": {
                    "names": ["客服1", "客服2"]
                }
            }
            with open(config_file, 'w', encoding='utf-8') as f:
                json.dump(default_config, f, indent=2, ensure_ascii=False)
            print(f"创建默认配置文件: {config_file}")
    return config_file
def setup_environment():
    """设置运行环境"""
    exe_dir = ensure_directories()
    config_file = ensure_config_file()
    # è®¾ç½®å·¥ä½œç›®å½•为exe所在目录
    os.chdir(exe_dir)
    print(f"设置工作目录: {exe_dir}")
    return exe_dir, config_file
if __name__ == "__main__":
    setup_environment()
    # å¯¼å…¥å¹¶å¯åŠ¨ä¸»åº”ç”¨
    try:
        from main import app
        import uvicorn
        from config import settings
        from loguru import logger
        # é…ç½®æ—¥å¿—
        logger.add(
            settings.log_file,
            rotation="1 day",
            retention="7 days",
            level=settings.log_level,
            format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
        )
        logger.info("启动E云管家-DifyAI对接服务")
        # å¯åŠ¨æœåŠ¡
        uvicorn.run(
            app,
            host=settings.server_host,
            port=settings.server_port,
            log_level=settings.log_level.lower(),
        )
    except Exception as e:
        print(f"启动失败: {e}")
        input("按任意键退出...")
        sys.exit(1)
uninstall_service.bat
New file
@@ -0,0 +1,61 @@
@echo off
chcp 65001 >nul
echo ========================================
echo E云管家-DifyAI对接服务 Windows服务卸载脚本
echo ========================================
echo.
:: æ£€æŸ¥ç®¡ç†å‘˜æƒé™
net session >nul 2>&1
if %errorLevel% == 0 (
    echo æ£€æµ‹åˆ°ç®¡ç†å‘˜æƒé™ï¼Œç»§ç»­å¸è½½...
) else (
    echo é”™è¯¯ï¼šéœ€è¦ç®¡ç†å‘˜æƒé™æ‰èƒ½å¸è½½Windows服务
    echo è¯·å³é”®ç‚¹å‡»æ­¤è„šæœ¬ï¼Œé€‰æ‹©"以管理员身份运行"
    pause
    exit /b 1
)
set "SERVICE_NAME=ECloudDifyService"
echo æœåŠ¡åç§°: %SERVICE_NAME%
echo.
:: æ£€æŸ¥æœåŠ¡æ˜¯å¦å­˜åœ¨
echo æ£€æŸ¥æœåŠ¡æ˜¯å¦å­˜åœ¨...
sc query "%SERVICE_NAME%" >nul 2>&1
if %errorLevel% == 0 (
    echo æ‰¾åˆ°æœåŠ¡ï¼Œæ­£åœ¨å¸è½½...
    :: åœæ­¢æœåŠ¡
    echo æ­£åœ¨åœæ­¢æœåŠ¡...
    sc stop "%SERVICE_NAME%" >nul 2>&1
    if %errorLevel% == 0 (
        echo æœåŠ¡å·²åœæ­¢
    ) else (
        echo æœåŠ¡å¯èƒ½å·²ç»åœæ­¢æˆ–åœæ­¢å¤±è´¥
    )
    :: ç­‰å¾…服务完全停止
    echo ç­‰å¾…服务完全停止...
    timeout /t 5 /nobreak >nul
    :: åˆ é™¤æœåŠ¡
    echo æ­£åœ¨åˆ é™¤æœåŠ¡...
    sc delete "%SERVICE_NAME%"
    if %errorLevel% == 0 (
        echo æœåŠ¡åˆ é™¤æˆåŠŸï¼
    ) else (
        echo é”™è¯¯ï¼šæœåŠ¡åˆ é™¤å¤±è´¥
        pause
        exit /b 1
    )
) else (
    echo æœªæ‰¾åˆ°æœåŠ¡ %SERVICE_NAME%
    echo å¯èƒ½æœåŠ¡å·²ç»è¢«åˆ é™¤æˆ–ä»Žæœªå®‰è£…
)
echo.
echo æœåŠ¡å¸è½½å®Œæˆï¼
echo.
pause