Compare commits
9 Commits
640e02f89e
..
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 9200df7842 | |||
| eb361fe77f | |||
| 4627a2268f | |||
| 46f756428e | |||
| a22b001680 | |||
| c97e96c620 | |||
| 94a8656c7f | |||
| bc6c907872 | |||
| 2b13fdab99 |
@@ -1,27 +1,27 @@
|
|||||||
OPENAI_API_BASE=https://api.siliconflow.cn/v1/
|
OPENAI_API_BASE=https://api.siliconflow.cn/v1/
|
||||||
MODEL_NAME=deepseek-ai/DeepSeek-V3
|
MODEL_NAME=deepseek-ai/DeepSeek-V3
|
||||||
|
|
||||||
RERANKER_BASE_URL=http://10.1.16.39:9995
|
# RERANKER_BASE_URL=http://10.1.16.39:9995
|
||||||
RERANKER_MODEL_NAME=bge-reranker-v2-m3
|
# RERANKER_MODEL_NAME=bge-reranker-v2-m3
|
||||||
RERANKER_API_KEY=test
|
# RERANKER_API_KEY=test
|
||||||
|
|
||||||
EMBEDDING_BASE_URL=http://10.1.16.39:9995
|
# EMBEDDING_BASE_URL=http://10.1.16.39:9995
|
||||||
EMBEDDING_MODEL_NAME=bge-m3
|
# EMBEDDING_MODEL_NAME=bge-m3
|
||||||
EMBEDDING_API_KEY=test
|
# EMBEDDING_API_KEY=test
|
||||||
|
|
||||||
|
|
||||||
DIFY_BSAE_URL=http://10.1.16.39/v1
|
# DIFY_BSAE_URL=http://10.1.16.39/v1
|
||||||
DIFY_APP_KEY=app-CPoOMaGDsLRPAe9TW7Xjhszy
|
# DIFY_APP_KEY=app-CPoOMaGDsLRPAe9TW7Xjhszy
|
||||||
DIFY_DATASET_KEY=dataset-skLjmPVonjHo119OWNf3kAmY
|
# DIFY_DATASET_KEY=dataset-skLjmPVonjHo119OWNf3kAmY
|
||||||
|
|
||||||
DIFY_PG_HOST = 10.1.16.39
|
# DIFY_PG_HOST = 10.1.16.39
|
||||||
DIFY_PG_PORT = 5432
|
# DIFY_PG_PORT = 5432
|
||||||
DIFY_PG_USER = postgres
|
# DIFY_PG_USER = postgres
|
||||||
DIFY_PG_PASSWORD = difyai123456
|
# DIFY_PG_PASSWORD = difyai123456
|
||||||
DIFY_PG_DATABASE = dify
|
# DIFY_PG_DATABASE = dify
|
||||||
|
|
||||||
|
|
||||||
ENABLE_LANGFUSE=true
|
# ENABLE_LANGFUSE=true
|
||||||
LANGFUSE_PUBLIC_KEY=pk-lf-4e9b7cbe-528c-4697-b73c-33257a60072c
|
# LANGFUSE_PUBLIC_KEY=pk-lf-4e9b7cbe-528c-4697-b73c-33257a60072c
|
||||||
LANGFUSE_SECRET_KEY=sk-lf-cd8a78c5-2538-455e-a85a-87b6e1aa69d0
|
# LANGFUSE_SECRET_KEY=sk-lf-cd8a78c5-2538-455e-a85a-87b6e1aa69d0
|
||||||
LANGFUSE_HOST=http://10.1.6.34:3000
|
# LANGFUSE_HOST=http://10.1.6.34:3000
|
||||||
+1
-3
@@ -11,9 +11,7 @@ data/excel/*.xlsx
|
|||||||
!data/excel/Excel版 清单定额库/
|
!data/excel/Excel版 清单定额库/
|
||||||
!data/excel/Excel版 清单定额库/**
|
!data/excel/Excel版 清单定额库/**
|
||||||
data/logs/*
|
data/logs/*
|
||||||
rag2_0/dify/Test.py
|
|
||||||
data/query_logs/*
|
|
||||||
data/conversations/*
|
|
||||||
data/test*
|
data/test*
|
||||||
data/temp*
|
data/temp*
|
||||||
data/db/answer_logs.db
|
data/db/answer_logs.db
|
||||||
|
data/db/qingdan_ding_e_ku.db
|
||||||
|
|||||||
Vendored
+8
-2
@@ -10,7 +10,10 @@
|
|||||||
"request": "launch",
|
"request": "launch",
|
||||||
"program": "${file}",
|
"program": "${file}",
|
||||||
"console": "integratedTerminal",
|
"console": "integratedTerminal",
|
||||||
"justMyCode": true
|
"justMyCode": true,
|
||||||
|
"env": {
|
||||||
|
"PYTHONPATH": "${workspaceFolder}"
|
||||||
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "IntentRecognition",
|
"name": "IntentRecognition",
|
||||||
@@ -18,7 +21,10 @@
|
|||||||
"request": "launch",
|
"request": "launch",
|
||||||
"program": "${workspaceFolder}/rag2_0/demo/intent_recognition_example.py",
|
"program": "${workspaceFolder}/rag2_0/demo/intent_recognition_example.py",
|
||||||
"console": "integratedTerminal",
|
"console": "integratedTerminal",
|
||||||
"justMyCode": true
|
"justMyCode": true,
|
||||||
|
"env": {
|
||||||
|
"PYTHONPATH": "${workspaceFolder}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -57,7 +57,6 @@ sk-benuasjbhbxvdmgxishibmtpfyieamlfclmdclfbqloqsmaf
|
|||||||
sk-ufmqbuplpjvzzlzohvsxertwgnguhipsbajxnxecvvccozly
|
sk-ufmqbuplpjvzzlzohvsxertwgnguhipsbajxnxecvvccozly
|
||||||
sk-rypfoscrczeelowmrsixiuyunyqmqvknaprsnzmdguwzrkzx
|
sk-rypfoscrczeelowmrsixiuyunyqmqvknaprsnzmdguwzrkzx
|
||||||
sk-lucemnosmcxuwedvzilpefuxjnyvaxldpbgaqwnwalxmntul
|
sk-lucemnosmcxuwedvzilpefuxjnyvaxldpbgaqwnwalxmntul
|
||||||
sk-niymkyuzpyovndvvqvpaniiqfgoofnxczhdmjjessiocbeul
|
|
||||||
sk-cxlvgeuxavxfcajprxietuqyqjngtbrwrmrmrioxmgtbkpci
|
sk-cxlvgeuxavxfcajprxietuqyqjngtbrwrmrmrioxmgtbkpci
|
||||||
sk-vjjsuzntqbhcmelfsuquqyoxjivxcfwyxnrhpwzobgxlpmrv
|
sk-vjjsuzntqbhcmelfsuquqyoxjivxcfwyxnrhpwzobgxlpmrv
|
||||||
sk-hbgctnpvntsnelveaudpekyncfgstdfazezboxmcgjvudzyg
|
sk-hbgctnpvntsnelveaudpekyncfgstdfazezboxmcgjvudzyg
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ def main(query: str) -> dict:
|
|||||||
|
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
||||||
|
|
||||||
# 定义数据库路径
|
# 定义数据库路径
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ import logging
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
||||||
|
|
||||||
# 确保日志目录存在
|
# 确保日志目录存在
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import pandas as pd
|
|||||||
from openpyxl import load_workbook
|
from openpyxl import load_workbook
|
||||||
import logging
|
import logging
|
||||||
import numpy as np
|
import numpy as np
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
||||||
from langchain_community.vectorstores import SQLiteVSS
|
from langchain_community.vectorstores import SQLiteVSS
|
||||||
|
|
||||||
|
|||||||
@@ -15,8 +15,8 @@ import logging
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.intent_recognition import AsyncIntentRecognizer
|
from rag2_0.intent_recognition import AsyncIntentRecognizer
|
||||||
|
from rag2_0.api.kefu_redirect_url import router as kefu_router
|
||||||
|
|
||||||
# 确保日志目录存在
|
# 确保日志目录存在
|
||||||
os.makedirs('data/logs', exist_ok=True)
|
os.makedirs('data/logs', exist_ok=True)
|
||||||
@@ -85,6 +85,9 @@ app.add_middleware(
|
|||||||
allow_headers=["*"],
|
allow_headers=["*"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 注册外部路由
|
||||||
|
app.include_router(kefu_router)
|
||||||
|
|
||||||
# 全局变量存储AsyncIntentRecognizer实例
|
# 全局变量存储AsyncIntentRecognizer实例
|
||||||
_instance = None
|
_instance = None
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,92 @@
|
|||||||
|
from fastapi import APIRouter
|
||||||
|
from fastapi.responses import RedirectResponse
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from queue import Queue, Full
|
||||||
|
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
# 以当前文件为基准的相对路径:../../data/db
|
||||||
|
PROJECT_ROOT = os.getcwd()
|
||||||
|
DB_DIR = os.path.join(PROJECT_ROOT, "data", "db")
|
||||||
|
DB_FILE = os.path.join(DB_DIR, "redirects.sqlite3")
|
||||||
|
TABLE_SQL = (
|
||||||
|
"CREATE TABLE IF NOT EXISTS redirects ("
|
||||||
|
" msg_id TEXT PRIMARY KEY,"
|
||||||
|
" url TEXT NOT NULL"
|
||||||
|
")"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_db():
|
||||||
|
"""确保数据库与表存在。"""
|
||||||
|
os.makedirs(DB_DIR, exist_ok=True)
|
||||||
|
with sqlite3.connect(DB_FILE) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(TABLE_SQL)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def save_redirect(msg_id: str, url: str) -> None:
|
||||||
|
"""将 msg_id 与 url 写入 SQLite,若已存在则忽略。
|
||||||
|
|
||||||
|
使用 INSERT OR IGNORE 结合 PRIMARY KEY(msg_id) 来避免重复写入。
|
||||||
|
"""
|
||||||
|
_ensure_db()
|
||||||
|
with sqlite3.connect(DB_FILE) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"INSERT OR IGNORE INTO redirects (msg_id, url) VALUES (?, ?)",
|
||||||
|
(msg_id, url),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
# ========= 异步写库队列与后台线程 =========
|
||||||
|
_write_queue: "Queue[tuple[str, str]]" = Queue(maxsize=10000)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_worker():
|
||||||
|
_ensure_db()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
msg_id, url = _write_queue.get()
|
||||||
|
try:
|
||||||
|
with sqlite3.connect(DB_FILE) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"INSERT OR IGNORE INTO redirects (msg_id, url) VALUES (?, ?)",
|
||||||
|
(msg_id, url),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
except Exception:
|
||||||
|
# 失败忽略,避免阻断工作线程
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
_write_queue.task_done()
|
||||||
|
except Exception:
|
||||||
|
# 防御性 sleep,避免异常导致CPU空转
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
_worker_thread = threading.Thread(target=_write_worker, daemon=True)
|
||||||
|
_worker_thread.start()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/kefu_login", summary="客服登录页重定向")
|
||||||
|
async def kefu_redirect(msg_id:str):
|
||||||
|
"""重定向到客服登录页。"""
|
||||||
|
target_url = "https://www.booway.com.cn/kefu/toLoginPage"
|
||||||
|
# 写入 SQLite:若 msg_id 已存在将不会重复写入
|
||||||
|
try:
|
||||||
|
if msg_id:
|
||||||
|
# 走异步队列
|
||||||
|
_write_queue.put_nowait((msg_id, target_url))
|
||||||
|
except Exception:
|
||||||
|
# 出于稳健性考虑,即使写库失败也不影响重定向
|
||||||
|
pass
|
||||||
|
return RedirectResponse(target_url, status_code=302)
|
||||||
|
|
||||||
@@ -10,7 +10,6 @@ import sys
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
# 导入ExcelToSQLiteProcessor类
|
# 导入ExcelToSQLiteProcessor类
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.api.create_qingdan_dinge_database import ExcelToSQLiteProcessor, create_db
|
from rag2_0.api.create_qingdan_dinge_database import ExcelToSQLiteProcessor, create_db
|
||||||
# 导入向量检索相关类
|
# 导入向量检索相关类
|
||||||
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
from rag2_0.tool.ModelTool import XinferenceEmbeddings
|
||||||
|
|||||||
@@ -18,8 +18,6 @@ from tqdm import tqdm
|
|||||||
import glob
|
import glob
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
# 将项目根目录添加到Python路径
|
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -142,14 +140,6 @@ class DialogueToWorkorder:
|
|||||||
"base_url": os.getenv("OPENAI_API_BASE"),
|
"base_url": os.getenv("OPENAI_API_BASE"),
|
||||||
"timeout": httpx.Timeout(600.0)
|
"timeout": httpx.Timeout(600.0)
|
||||||
}
|
}
|
||||||
# self.llm_params = llm_params or {
|
|
||||||
# "temperature": 0.2,
|
|
||||||
# "top_p":0.95,
|
|
||||||
# "model": "deepseek-r1",
|
|
||||||
# "api_key": "25t%Syu6I9yxX2IuTN",
|
|
||||||
# "base_url": "http://10.1.0.154:8000/v1",
|
|
||||||
# "timeout": httpx.Timeout(600.0)
|
|
||||||
# }
|
|
||||||
self.llm = self._get_llm_instance()
|
self.llm = self._get_llm_instance()
|
||||||
|
|
||||||
# 创建工单JSON文件目录
|
# 创建工单JSON文件目录
|
||||||
|
|||||||
+155
-273
@@ -5,23 +5,18 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import configparser
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from contextlib import contextmanager
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
from queue import Queue, Empty, Full
|
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import pymysql
|
import pymysql
|
||||||
from pymysql.connections import Connection
|
|
||||||
from pymysql.cursors import Cursor
|
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import sys
|
import sys
|
||||||
|
from queue import Queue, Empty, Full
|
||||||
|
|
||||||
os.makedirs('./data/logs', exist_ok=True)
|
os.makedirs('./data/logs', exist_ok=True)
|
||||||
# 配置日志
|
# 配置日志
|
||||||
@@ -35,6 +30,18 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# =====================
|
||||||
|
# 硬编码数据库配置(简化)
|
||||||
|
# =====================
|
||||||
|
DB_HOST = '192.168.0.123'
|
||||||
|
DB_PORT = 3307
|
||||||
|
DB_USER = 'fuzhimei'
|
||||||
|
DB_PASSWORD = 'fuzhimei@135'
|
||||||
|
DB_CHARSET = 'utf8mb4'
|
||||||
|
DB_CONNECT_TIMEOUT = 10
|
||||||
|
DB_READ_TIMEOUT = 300
|
||||||
|
DB_WRITE_TIMEOUT = 300
|
||||||
|
|
||||||
def parse_session_tags(input_string):
|
def parse_session_tags(input_string):
|
||||||
"""
|
"""
|
||||||
解析sessionTag格式的字符串,支持任意数量的sessionTag
|
解析sessionTag格式的字符串,支持任意数量的sessionTag
|
||||||
@@ -76,171 +83,6 @@ def parse_session_tags(input_string):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DatabaseConfig:
|
|
||||||
"""数据库配置类"""
|
|
||||||
host: str = '192.168.0.123'
|
|
||||||
port: int = 3307
|
|
||||||
user: str = 'fuzhimei'
|
|
||||||
password: str = 'fuzhimei@135'
|
|
||||||
charset: str = 'utf8mb4'
|
|
||||||
connect_timeout: int = 10
|
|
||||||
read_timeout: int = 300
|
|
||||||
write_timeout: int = 300
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_config_file(cls, config_file: str = 'config.ini') -> 'DatabaseConfig':
|
|
||||||
"""从配置文件加载配置"""
|
|
||||||
if not os.path.exists(config_file):
|
|
||||||
logger.warning(f"配置文件 {config_file} 不存在,使用默认配置")
|
|
||||||
return cls()
|
|
||||||
|
|
||||||
config = configparser.ConfigParser()
|
|
||||||
config.read(config_file, encoding='utf-8')
|
|
||||||
|
|
||||||
if 'database' not in config:
|
|
||||||
logger.warning("配置文件中没有 [database] 部分,使用默认配置")
|
|
||||||
return cls()
|
|
||||||
|
|
||||||
db_config = config['database']
|
|
||||||
return cls(
|
|
||||||
host=db_config.get('host', cls.host),
|
|
||||||
port=int(db_config.get('port', cls.port)),
|
|
||||||
user=db_config.get('user', cls.user),
|
|
||||||
password=db_config.get('password', cls.password),
|
|
||||||
charset=db_config.get('charset', cls.charset),
|
|
||||||
connect_timeout=int(db_config.get('connect_timeout', cls.connect_timeout)),
|
|
||||||
read_timeout=int(db_config.get('read_timeout', cls.read_timeout)),
|
|
||||||
write_timeout=int(db_config.get('write_timeout', cls.write_timeout))
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPool:
|
|
||||||
"""数据库连接池"""
|
|
||||||
|
|
||||||
def __init__(self, config: DatabaseConfig, max_connections: int = 10):
|
|
||||||
self.config = config
|
|
||||||
self.max_connections = max_connections
|
|
||||||
self.pool = Queue(maxsize=max_connections)
|
|
||||||
self.active_connections = 0
|
|
||||||
self.lock = threading.Lock()
|
|
||||||
|
|
||||||
# 预创建一些连接
|
|
||||||
self._initialize_pool()
|
|
||||||
|
|
||||||
def _initialize_pool(self) -> None:
|
|
||||||
"""初始化连接池,预创建一些连接"""
|
|
||||||
initial_connections = min(3, self.max_connections)
|
|
||||||
for _ in range(initial_connections):
|
|
||||||
try:
|
|
||||||
conn = self._create_connection()
|
|
||||||
if conn:
|
|
||||||
self.pool.put_nowait(conn)
|
|
||||||
self.active_connections += 1
|
|
||||||
except Full:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"初始化连接池时创建连接失败: {e}")
|
|
||||||
|
|
||||||
def _create_connection(self) -> Optional[Connection]:
|
|
||||||
"""创建新的数据库连接"""
|
|
||||||
try:
|
|
||||||
conn = pymysql.connect(
|
|
||||||
host=self.config.host,
|
|
||||||
port=self.config.port,
|
|
||||||
user=self.config.user,
|
|
||||||
password=self.config.password,
|
|
||||||
charset=self.config.charset,
|
|
||||||
connect_timeout=self.config.connect_timeout,
|
|
||||||
read_timeout=self.config.read_timeout,
|
|
||||||
write_timeout=self.config.write_timeout,
|
|
||||||
autocommit=True
|
|
||||||
)
|
|
||||||
return conn
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"创建数据库连接失败: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def get_connection(self):
|
|
||||||
"""获取连接的上下文管理器"""
|
|
||||||
conn = None
|
|
||||||
try:
|
|
||||||
# 尝试从池中获取连接
|
|
||||||
try:
|
|
||||||
conn = self.pool.get_nowait()
|
|
||||||
except Empty:
|
|
||||||
# 池中没有连接,尝试创建新连接
|
|
||||||
with self.lock:
|
|
||||||
if self.active_connections < self.max_connections:
|
|
||||||
conn = self._create_connection()
|
|
||||||
if conn:
|
|
||||||
self.active_connections += 1
|
|
||||||
else:
|
|
||||||
raise Exception("无法创建新的数据库连接")
|
|
||||||
else:
|
|
||||||
# 等待可用连接
|
|
||||||
logger.info("等待可用连接...")
|
|
||||||
conn = self.pool.get(timeout=30)
|
|
||||||
|
|
||||||
# 检查连接是否仍然有效
|
|
||||||
if conn and not self._is_connection_alive(conn):
|
|
||||||
logger.warning("连接已失效,重新创建")
|
|
||||||
try:
|
|
||||||
conn.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
conn = self._create_connection()
|
|
||||||
if not conn:
|
|
||||||
raise Exception("重新创建连接失败")
|
|
||||||
|
|
||||||
yield conn
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"获取数据库连接时出错: {e}")
|
|
||||||
if conn:
|
|
||||||
try:
|
|
||||||
conn.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
with self.lock:
|
|
||||||
self.active_connections -= 1
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
# 归还连接到池中
|
|
||||||
if conn:
|
|
||||||
try:
|
|
||||||
self.pool.put_nowait(conn)
|
|
||||||
except Full:
|
|
||||||
# 池已满,关闭连接
|
|
||||||
try:
|
|
||||||
conn.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
with self.lock:
|
|
||||||
self.active_connections -= 1
|
|
||||||
|
|
||||||
def _is_connection_alive(self, conn: Connection) -> bool:
|
|
||||||
"""检查连接是否仍然有效"""
|
|
||||||
try:
|
|
||||||
conn.ping(reconnect=False)
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def close_all(self) -> None:
|
|
||||||
"""关闭所有连接"""
|
|
||||||
logger.info("正在关闭连接池中的所有连接...")
|
|
||||||
while not self.pool.empty():
|
|
||||||
try:
|
|
||||||
conn = self.pool.get_nowait()
|
|
||||||
conn.close()
|
|
||||||
except (Empty, Exception):
|
|
||||||
break
|
|
||||||
|
|
||||||
self.active_connections = 0
|
|
||||||
logger.info("连接池已关闭")
|
|
||||||
|
|
||||||
|
|
||||||
class DataProcessor:
|
class DataProcessor:
|
||||||
"""数据处理器"""
|
"""数据处理器"""
|
||||||
@@ -357,12 +199,76 @@ class DataProcessor:
|
|||||||
|
|
||||||
|
|
||||||
class MariaDBClient:
|
class MariaDBClient:
|
||||||
"""优化后的MariaDB数据库客户端"""
|
"""简化版 MariaDB 客户端(内置轻量连接池以复用连接)"""
|
||||||
|
|
||||||
def __init__(self, config: DatabaseConfig, max_connections: int = 10):
|
def __init__(self, max_connections: int = 10):
|
||||||
self.config = config
|
|
||||||
self.connection_pool = ConnectionPool(config, max_connections)
|
|
||||||
self.data_processor = DataProcessor()
|
self.data_processor = DataProcessor()
|
||||||
|
self._max_connections = max_connections
|
||||||
|
self._pool: Queue = Queue(maxsize=max_connections)
|
||||||
|
self._active = 0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
# 预创建少量连接,降低首次延迟
|
||||||
|
initial = min(3, max_connections)
|
||||||
|
for _ in range(initial):
|
||||||
|
conn = self._create_connection()
|
||||||
|
if conn:
|
||||||
|
try:
|
||||||
|
self._pool.put_nowait(conn)
|
||||||
|
self._active += 1
|
||||||
|
except Full:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _create_connection(self):
|
||||||
|
try:
|
||||||
|
return pymysql.connect(
|
||||||
|
host=DB_HOST,
|
||||||
|
port=DB_PORT,
|
||||||
|
user=DB_USER,
|
||||||
|
password=DB_PASSWORD,
|
||||||
|
charset=DB_CHARSET,
|
||||||
|
connect_timeout=DB_CONNECT_TIMEOUT,
|
||||||
|
read_timeout=DB_READ_TIMEOUT,
|
||||||
|
write_timeout=DB_WRITE_TIMEOUT,
|
||||||
|
autocommit=True
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"创建数据库连接失败: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _acquire_connection(self):
|
||||||
|
# 先尝试不阻塞获取
|
||||||
|
try:
|
||||||
|
return self._pool.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
# 池为空,若可创建新连接则创建,否则阻塞等待
|
||||||
|
with self._lock:
|
||||||
|
if self._active < self._max_connections:
|
||||||
|
conn = self._create_connection()
|
||||||
|
if conn:
|
||||||
|
self._active += 1
|
||||||
|
return conn
|
||||||
|
# 达到上限,阻塞等待可用连接
|
||||||
|
try:
|
||||||
|
return self._pool.get(timeout=30)
|
||||||
|
except Empty:
|
||||||
|
raise RuntimeError("获取数据库连接超时")
|
||||||
|
|
||||||
|
def _release_connection(self, conn):
|
||||||
|
if not conn:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._pool.put_nowait(conn)
|
||||||
|
except Full:
|
||||||
|
# 池已满,关闭多余连接
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
with self._lock:
|
||||||
|
self._active = max(0, self._active - 1)
|
||||||
|
|
||||||
def __enter__(self) -> 'MariaDBClient':
|
def __enter__(self) -> 'MariaDBClient':
|
||||||
return self
|
return self
|
||||||
@@ -371,30 +277,52 @@ class MariaDBClient:
|
|||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
"""关闭客户端"""
|
"""关闭连接池中的连接"""
|
||||||
self.connection_pool.close_all()
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
conn = self._pool.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
with self._lock:
|
||||||
|
self._active = 0
|
||||||
|
|
||||||
def execute_query(self, sql: str, params: Optional[Tuple] = None) -> Tuple[Optional[pd.DataFrame], List[str]]:
|
def execute_query(self, sql: str, params: Optional[Tuple] = None) -> Tuple[Optional[pd.DataFrame], List[str]]:
|
||||||
"""执行SQL查询"""
|
"""执行SQL查询(复用连接池连接)"""
|
||||||
|
conn = None
|
||||||
try:
|
try:
|
||||||
with self.connection_pool.get_connection() as conn:
|
conn = self._acquire_connection()
|
||||||
with conn.cursor() as cursor:
|
with conn.cursor() as cursor:
|
||||||
cursor.execute(sql, params)
|
cursor.execute(sql, params)
|
||||||
results = cursor.fetchall()
|
results = cursor.fetchall()
|
||||||
|
|
||||||
# 获取列名
|
|
||||||
column_names = [desc[0] for desc in cursor.description] if cursor.description else []
|
column_names = [desc[0] for desc in cursor.description] if cursor.description else []
|
||||||
|
|
||||||
if results:
|
if results:
|
||||||
df = pd.DataFrame(results, columns=column_names)
|
df = pd.DataFrame(results, columns=column_names)
|
||||||
return df, column_names
|
return df, column_names
|
||||||
else:
|
else:
|
||||||
return pd.DataFrame(), column_names
|
return pd.DataFrame(), column_names
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"执行查询时出错: {e}")
|
logger.error(f"执行查询时出错: {e}")
|
||||||
logger.error(f"SQL: {sql}")
|
logger.error(f"SQL: {sql}")
|
||||||
return None, []
|
return None, []
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
# 若连接异常,尝试关闭并减少活跃计数;否则归还
|
||||||
|
try:
|
||||||
|
conn.ping(reconnect=False)
|
||||||
|
self._release_connection(conn)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
with self._lock:
|
||||||
|
self._active = max(0, self._active - 1)
|
||||||
|
|
||||||
def query_sessions(self, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
|
def query_sessions(self, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
|
||||||
"""查询指定日期范围内的会话数据"""
|
"""查询指定日期范围内的会话数据"""
|
||||||
@@ -483,101 +411,18 @@ class MariaDBClient:
|
|||||||
logger.error(f"导出到Excel时出错: {e}")
|
logger.error(f"导出到Excel时出错: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
class SessionProcessor:
|
|
||||||
"""会话处理器,负责并发处理"""
|
|
||||||
|
|
||||||
def __init__(self, db_client: MariaDBClient, max_workers: int = None):
|
|
||||||
self.db_client = db_client
|
|
||||||
self.max_workers = max_workers if max_workers is not None else os.cpu_count()
|
|
||||||
self.temp_save_lock = threading.Lock() # 添加锁用于保护临时保存操作
|
|
||||||
|
|
||||||
logger.info(f"初始化会话处理器: max_workers={self.max_workers}")
|
|
||||||
|
|
||||||
def process_sessions(self, sessions_df: pd.DataFrame) -> List[List[Dict[str, Any]]]:
|
|
||||||
"""处理所有会话数据"""
|
|
||||||
if sessions_df.empty:
|
|
||||||
logger.warning("没有会话数据需要处理")
|
|
||||||
return []
|
|
||||||
|
|
||||||
total_sessions = len(sessions_df)
|
|
||||||
logger.info(f"开始处理 {total_sessions} 个会话...")
|
|
||||||
|
|
||||||
all_conversations = []
|
|
||||||
|
|
||||||
# 直接并发处理每个会话
|
|
||||||
def process_single_session(session_row):
|
|
||||||
try:
|
|
||||||
session_id = session_row['SESSION_ID']
|
|
||||||
messages_df = self.db_client.query_messages_by_session_id(session_id)
|
|
||||||
|
|
||||||
if messages_df is not None and not messages_df.empty:
|
|
||||||
conversation = self.db_client.data_processor.messages_df_to_list(messages_df, session_row)
|
|
||||||
if conversation:
|
|
||||||
return conversation
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理会话 {session_row.get('SESSION_ID', 'unknown')} 时出错: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 使用线程池处理所有会话
|
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
||||||
# 提交所有会话处理任务
|
|
||||||
future_to_session = {
|
|
||||||
executor.submit(process_single_session, row): i
|
|
||||||
for i, row in sessions_df.iterrows()
|
|
||||||
}
|
|
||||||
|
|
||||||
# 收集结果
|
|
||||||
with tqdm(total=total_sessions, desc="处理会话进度") as pbar:
|
|
||||||
for future in concurrent.futures.as_completed(future_to_session):
|
|
||||||
try:
|
|
||||||
conversation = future.result()
|
|
||||||
if conversation:
|
|
||||||
all_conversations.append(conversation)
|
|
||||||
|
|
||||||
# 每处理100个对话临时保存一次
|
|
||||||
if len(all_conversations) % 100 == 0:
|
|
||||||
with self.temp_save_lock:
|
|
||||||
logger.info(f"临时保存 {len(all_conversations)} 个对话")
|
|
||||||
temp_output_file = self.db_client.export_to_excel(
|
|
||||||
all_conversations,
|
|
||||||
f"客服对话记录_临时保存",
|
|
||||||
output_dir="/data/QueryRewrite/data/excel"
|
|
||||||
)
|
|
||||||
if temp_output_file:
|
|
||||||
logger.info(f"临时保存完成: {temp_output_file}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
session_idx = future_to_session[future]
|
|
||||||
logger.error(f"处理会话索引 {session_idx} 时出错: {e}")
|
|
||||||
|
|
||||||
pbar.update(1)
|
|
||||||
|
|
||||||
logger.info(f"处理完成,共获得 {len(all_conversations)} 个有效对话")
|
|
||||||
return all_conversations
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
"""主函数"""
|
"""主函数(精简版)"""
|
||||||
try:
|
try:
|
||||||
# 加载配置
|
logger.info(f"使用数据库配置: {DB_HOST}:{DB_PORT}")
|
||||||
config = DatabaseConfig.from_config_file()
|
|
||||||
logger.info(f"使用数据库配置: {config.host}:{config.port}")
|
|
||||||
|
|
||||||
# 创建数据库客户端
|
# 创建数据库客户端(简化)
|
||||||
with MariaDBClient(config, max_connections=12) as db_client:
|
with MariaDBClient() as db_client:
|
||||||
# 查询会话数据
|
# 查询会话数据
|
||||||
start_date = '2025-08-01 00:00:00'
|
start_date = '2025-08-01 00:00:00'
|
||||||
end_date = '2025-08-01 23:00:00'
|
end_date = '2025-08-01 23:00:00'
|
||||||
|
|
||||||
logger.info(f"查询时间范围: {start_date} 到 {end_date}")
|
logger.info(f"查询时间范围: {start_date} 到 {end_date}")
|
||||||
# 创建会话处理器
|
|
||||||
processor = SessionProcessor(db_client)
|
|
||||||
# is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
|
|
||||||
# if is_debug:
|
|
||||||
# messages_df = db_client.query_messages_by_session_id("86c919e0-09f1-11f0-84ae-2daf59566989")
|
|
||||||
# print(db_client.data_processor.messages_df_to_list(messages_df))
|
|
||||||
# return []
|
|
||||||
|
|
||||||
sessions_df = db_client.query_sessions(start_date, end_date)
|
sessions_df = db_client.query_sessions(start_date, end_date)
|
||||||
|
|
||||||
@@ -585,8 +430,46 @@ def main() -> None:
|
|||||||
logger.warning("没有找到符合条件的会话数据")
|
logger.warning("没有找到符合条件的会话数据")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 处理会话数据
|
# 直接并发处理每个会话(替代 SessionProcessor)
|
||||||
all_conversations = processor.process_sessions(sessions_df)
|
total_sessions = len(sessions_df)
|
||||||
|
all_conversations: List[List[Dict[str, Any]]] = []
|
||||||
|
temp_save_lock = threading.Lock()
|
||||||
|
|
||||||
|
def process_single_session(session_row):
|
||||||
|
try:
|
||||||
|
session_id = session_row['SESSION_ID']
|
||||||
|
messages_df = db_client.query_messages_by_session_id(session_id)
|
||||||
|
if messages_df is not None and not messages_df.empty:
|
||||||
|
conversation = db_client.data_processor.messages_df_to_list(messages_df, session_row)
|
||||||
|
if conversation:
|
||||||
|
return conversation
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"处理会话 {session_row.get('SESSION_ID', 'unknown')} 时出错: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
|
||||||
|
future_to_session = {executor.submit(process_single_session, row): i for i, row in sessions_df.iterrows()}
|
||||||
|
with tqdm(total=total_sessions, desc="处理会话进度") as pbar:
|
||||||
|
for future in concurrent.futures.as_completed(future_to_session):
|
||||||
|
try:
|
||||||
|
conversation = future.result()
|
||||||
|
if conversation:
|
||||||
|
all_conversations.append(conversation)
|
||||||
|
if len(all_conversations) % 100 == 0:
|
||||||
|
with temp_save_lock:
|
||||||
|
logger.info(f"临时保存 {len(all_conversations)} 个对话")
|
||||||
|
temp_output_file = db_client.export_to_excel(
|
||||||
|
all_conversations,
|
||||||
|
f"客服对话记录_临时保存",
|
||||||
|
output_dir="/data/QueryRewrite/data/excel"
|
||||||
|
)
|
||||||
|
if temp_output_file:
|
||||||
|
logger.info(f"临时保存完成: {temp_output_file}")
|
||||||
|
except Exception as e:
|
||||||
|
session_idx = future_to_session[future]
|
||||||
|
logger.error(f"处理会话索引 {session_idx} 时出错: {e}")
|
||||||
|
pbar.update(1)
|
||||||
|
|
||||||
# 导出结果
|
# 导出结果
|
||||||
if all_conversations:
|
if all_conversations:
|
||||||
output_file = db_client.export_to_excel(
|
output_file = db_client.export_to_excel(
|
||||||
@@ -594,7 +477,6 @@ def main() -> None:
|
|||||||
"客服对话记录",
|
"客服对话记录",
|
||||||
output_dir="/data/QueryRewrite/data/excel"
|
output_dir="/data/QueryRewrite/data/excel"
|
||||||
)
|
)
|
||||||
|
|
||||||
if output_file:
|
if output_file:
|
||||||
logger.info(f"处理完成!共导出 {len(all_conversations)} 个对话到文件: {output_file}")
|
logger.info(f"处理完成!共导出 {len(all_conversations)} 个对话到文件: {output_file}")
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ import argparse
|
|||||||
from typing import List, Dict, Any
|
from typing import List, Dict, Any
|
||||||
from langchain.output_parsers import PydanticOutputParser
|
from langchain.output_parsers import PydanticOutputParser
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.intent_recognition import AsyncIntentRecognizer
|
from rag2_0.intent_recognition import AsyncIntentRecognizer
|
||||||
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
from rag2_0.dify.DifyQueryRetrieval import DifyQueryRetrieval
|
||||||
from rag2_0.intent_recognition.DataModels import Classification
|
from rag2_0.intent_recognition.DataModels import Classification
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import os
|
|||||||
import json
|
import json
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.intent_recognition import ProfessionalNounVectorizer
|
from rag2_0.intent_recognition import ProfessionalNounVectorizer
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ from datetime import datetime
|
|||||||
import os
|
import os
|
||||||
from langchain_core.output_parsers import JsonOutputParser
|
from langchain_core.output_parsers import JsonOutputParser
|
||||||
|
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.dify.dify_client import ChatClient
|
from rag2_0.dify.dify_client import ChatClient
|
||||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||||
from rag2_0.dify.dify_tool import DifyTool
|
from rag2_0.dify.dify_tool import DifyTool
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import logging
|
|||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
import httpx
|
import httpx
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
|
|
||||||
from rag2_0.dify.dify_client.client import DifyClient, KnowledgeBaseClient
|
from rag2_0.dify.dify_client.client import DifyClient, KnowledgeBaseClient
|
||||||
from rag2_0.tool.ModelTool import XinferenceReRankerModel
|
from rag2_0.tool.ModelTool import XinferenceReRankerModel
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import sys
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
|
|
||||||
from rag2_0.dify.dify_client import DifyApi
|
from rag2_0.dify.dify_client import DifyApi
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ logging.basicConfig(
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
import rag2_0.dify.dify_client.dify_api as DifyApi
|
import rag2_0.dify.dify_client.dify_api as DifyApi
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import json
|
|||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.dify.dify_client import ChatClient
|
from rag2_0.dify.dify_client import ChatClient
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from langchain.output_parsers import PydanticOutputParser
|
from langchain.output_parsers import PydanticOutputParser
|
||||||
@@ -271,6 +270,51 @@ class DifyTool:
|
|||||||
raise Exception(f"Error while getting conversation_messages: {error}")
|
raise Exception(f"Error while getting conversation_messages: {error}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def execute_custom_sql(self, sql, params=None, fetch_type='all'):
|
||||||
|
"""
|
||||||
|
执行自定义的SQL查询或命令。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sql: 要执行的SQL语句
|
||||||
|
params: SQL参数(可选),用于参数化查询
|
||||||
|
fetch_type: 结果获取类型,可选值:'all'(返回所有行), 'one'(返回单行), 'none'(不返回结果)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
根据fetch_type返回查询结果:
|
||||||
|
- fetch_type='all': 返回包含所有行的列表,每行是一个字典
|
||||||
|
- fetch_type='one': 返回单行字典,如果没有结果则返回None
|
||||||
|
- fetch_type='none': 返回受影响的行数
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: 如果执行SQL时发生错误
|
||||||
|
"""
|
||||||
|
with self.pg_sql_lock:
|
||||||
|
try:
|
||||||
|
with self.connection.cursor() as cursor:
|
||||||
|
# 执行SQL语句
|
||||||
|
cursor.execute(sql, params or ())
|
||||||
|
|
||||||
|
# 根据fetch_type处理结果
|
||||||
|
if fetch_type == 'all':
|
||||||
|
result = cursor.fetchall()
|
||||||
|
if result:
|
||||||
|
colnames = [desc[0] for desc in cursor.description]
|
||||||
|
return [dict(zip(colnames, row)) for row in result]
|
||||||
|
return []
|
||||||
|
elif fetch_type == 'one':
|
||||||
|
result = cursor.fetchone()
|
||||||
|
if result:
|
||||||
|
colnames = [desc[0] for desc in cursor.description]
|
||||||
|
return dict(zip(colnames, result))
|
||||||
|
return None
|
||||||
|
elif fetch_type == 'none':
|
||||||
|
# 对于UPDATE, INSERT, DELETE等操作,返回受影响的行数
|
||||||
|
return cursor.rowcount
|
||||||
|
else:
|
||||||
|
raise ValueError(f"不支持的fetch_type: {fetch_type}")
|
||||||
|
except (Exception, psycopg2.Error) as error:
|
||||||
|
raise Exception(f"Error executing custom SQL: {error}")
|
||||||
|
|
||||||
"""
|
"""
|
||||||
提供用于获取 Dify 应用调试信息的工具类。
|
提供用于获取 Dify 应用调试信息的工具类。
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import pandas as pd
|
|||||||
|
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.getcwd())
|
|
||||||
from rag2_0.dify.dify_tool import DifyTool
|
from rag2_0.dify.dify_tool import DifyTool
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
|||||||
@@ -109,31 +109,20 @@ class Classification(BaseModel):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_format_instructions(cls):
|
def get_format_instructions(cls):
|
||||||
return """
|
return """格式如下,必须严格以纯JSON格式输出
|
||||||
格式如下,必须严格以纯JSON格式输出
|
{"vertical_classification": "垂直领域一级分类","sub_classification": "一级分类下的二级分类"}
|
||||||
{
|
字段说明:
|
||||||
"vertical_classification": "垂直领域一级分类",
|
vertical_classification 类型:str 描述:垂直领域一级分类
|
||||||
"sub_classification": "一级分类下的二级分类"
|
sub_classification 类型:str 描述:一级分类下的二级分类"""
|
||||||
}
|
|
||||||
字段说明:
|
|
||||||
vertical_classification 类型:str 描述:垂直领域一级分类
|
|
||||||
sub_classification 类型:str 描述:一级分类下的二级分类
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
class QueryRewrite(BaseModel):
|
class QueryRewrite(BaseModel):
|
||||||
rewrite:str = Field(description="问题改写")
|
rewrite:str = Field(description="问题改写")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_format_instructions(cls):
|
def get_format_instructions(cls):
|
||||||
return """
|
return """格式如下:必须严格以纯JSON格式输出{"rewrite": "问题改写"}
|
||||||
格式如下:必须严格以纯JSON格式输出
|
字段说明:
|
||||||
{
|
rewrite 类型:str 描述:问题改写之后的内容"""
|
||||||
"rewrite": "问题改写"
|
|
||||||
}
|
|
||||||
字段说明:
|
|
||||||
"rewrite" 类型:str 描述:问题改写之后的内容
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
# 意图优化环节数据模型
|
# 意图优化环节数据模型
|
||||||
@@ -145,18 +134,12 @@ class StepBackPrompt(BaseModel):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_format_instructions(cls):
|
def get_format_instructions(cls):
|
||||||
return """
|
return """格式如下,必须严格以纯JSON格式输出
|
||||||
格式如下,必须严格以纯JSON格式输出
|
{"original_query": "原始查询","can_use_back_prompt": "原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示","step_back_query": "后退提示生成的抽象查询(多个)"}
|
||||||
{
|
字段说明:
|
||||||
"original_query": "原始查询",
|
original_query 类型:str 描述:用户输入的原始查询
|
||||||
"can_use_back_prompt": "原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示",
|
can_use_back_prompt 类型:bool 描述:原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示
|
||||||
"step_back_query": "后退提示生成的抽象查询(多个)"
|
step_back_query 类型:list[str] 描述:后退提示生成的抽象查询(多个)"""
|
||||||
}
|
|
||||||
字段说明:
|
|
||||||
"original_query" 类型:str 描述:用户输入的原始查询
|
|
||||||
"can_use_back_prompt" 类型:bool 描述:原始查询是否可以进行后退提示(true/false),如果原始查询没有限定词或其他限定词语,则不能进行后退提示
|
|
||||||
"step_back_query" 类型:list[str] 描述:后退提示生成的抽象查询(多个)
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class FollowUpQuestions(BaseModel):
|
class FollowUpQuestions(BaseModel):
|
||||||
|
|||||||
@@ -17,21 +17,18 @@ from typing import List, Tuple, Dict, Any, Optional
|
|||||||
import re
|
import re
|
||||||
import jieba
|
import jieba
|
||||||
import time
|
import time
|
||||||
import threading
|
|
||||||
|
|
||||||
from .PromptTemplates import (classification_prompt, query_rewrite_prompt_pro,
|
from .PromptTemplates import (classification_prompt, query_rewrite_prompt_pro,
|
||||||
extract_nouns_prompt, classification_info,
|
extract_nouns_prompt, classification_info,
|
||||||
slot_filling_prompt, step_back_prompt,
|
slot_filling_prompt, step_back_prompt)
|
||||||
hyde_prompt)
|
|
||||||
|
|
||||||
from .DataModels import (
|
from .DataModels import (
|
||||||
Classification, QueryRewrite, Term, TermList,
|
Classification, QueryRewrite, Term, TermList,
|
||||||
SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots,
|
SoftwareFunctionSlots, SoftwareTroubleShootingSlots, ProfessionalConsultingSlots,
|
||||||
DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots,
|
DataProblemSlots, FileExtensionConsultingSlots, SoftwareLockSlots,
|
||||||
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots, IntentAndSlotResult,
|
InstallationDownloadSlots, ProblemDiagnosisSlots, OtherSlots,
|
||||||
StepBackPrompt, HypotheticalDocument
|
StepBackPrompt
|
||||||
)
|
)
|
||||||
from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever
|
|
||||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||||
|
|
||||||
class AsyncIntentRecognizer:
|
class AsyncIntentRecognizer:
|
||||||
@@ -344,23 +341,25 @@ class AsyncIntentRecognizer:
|
|||||||
"""
|
"""
|
||||||
start_time = time.time() # 记录开始时间
|
start_time = time.time() # 记录开始时间
|
||||||
|
|
||||||
prompt=f"""
|
prompt=f"""当前提问内容:
|
||||||
当前提问内容:
|
<query>{query}</query>
|
||||||
<query>{query}</query>
|
对话上下文:
|
||||||
对话上下文:
|
<chat_history>
|
||||||
<chat_history>
|
{json.dumps(chat_history, ensure_ascii=False)}
|
||||||
{json.dumps(chat_history, ensure_ascii=False)}
|
</chat_history>
|
||||||
</chat_history>
|
|
||||||
|
|
||||||
1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称
|
1、请从当前提问内容中提取电力造价行中定额编码、定额名称、清单编码、清单名称
|
||||||
2、请勿随机编造,如果没有提取到内容返回空的JSON
|
2、请勿随机编造,如果没有提取到内容返回空的JSON
|
||||||
3、返回结果为json格式,必须严格以纯JSON格式输出
|
3、返回结果为json格式,必须严格以纯JSON格式输出
|
||||||
{{
|
{{
|
||||||
"dinge_info_list":{{"dinge_code_list":["xxxx","xxxx"], "dinge_name_list":["xxxx","xxxx"]}},
|
"dinge_info_list":{{"dinge_code_list":["xxxx","xxxx"], "dinge_name_list":["xxxx","xxxx"]}},
|
||||||
"qingdan_info":{{"qingdan_code_list":["xxxx","xxxx"], "qingdan_name_list":["xxxx","xxxx"]}}
|
"qingdan_info":{{"qingdan_code_list":["xxxx","xxxx"], "qingdan_name_list":["xxxx","xxxx"]}}
|
||||||
}}
|
}}"""
|
||||||
"""
|
# 暂时跳过提取定额清单信息,本环节还未梳理清楚套用规则。
|
||||||
|
return {
|
||||||
|
"dinge_info_list":{"dinge_code_list":[], "dinge_name_list":[]},
|
||||||
|
"qingdan_info":{"qingdan_code_list":[], "qingdan_name_list":[]}
|
||||||
|
}
|
||||||
try:
|
try:
|
||||||
# response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
|
# response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"}, extra_body={"enable_thinking": False})
|
||||||
response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"})
|
response = await self._llm.ainvoke(prompt, response_format={"type": "json_object"})
|
||||||
@@ -489,6 +488,7 @@ class AsyncIntentRecognizer:
|
|||||||
# 特殊处理 锁相关咨询
|
# 特殊处理 锁相关咨询
|
||||||
if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类":
|
if classification.vertical_classification == "安装下载注册" and classification.sub_classification == "软件锁类":
|
||||||
process_lock_start_time = time.time()
|
process_lock_start_time = time.time()
|
||||||
|
# 特殊处理提问只有锁号的问题,手动将问题改写为特定格式
|
||||||
rewrite.rewrite = self._process_lock_related_query(rewrite.rewrite)
|
rewrite.rewrite = self._process_lock_related_query(rewrite.rewrite)
|
||||||
process_lock_end_time = time.time()
|
process_lock_end_time = time.time()
|
||||||
process_lock_time = process_lock_end_time - process_lock_start_time
|
process_lock_time = process_lock_end_time - process_lock_start_time
|
||||||
|
|||||||
@@ -28,14 +28,14 @@ extract_nouns_prompt="""
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
classification_info="""【垂直领域分类】:
|
classification_info="""【垂直领域分类】:
|
||||||
1. 软件问题 -- 指涉及软件使用、功能询问、软件故障排查等方面的提问或请求。
|
1. 软件问题 -- 涉及软件使用/功能/操作或故障排查。
|
||||||
2. 业务问题 -- 指涉及电力造价领域专业知识、造价费用计算等电力造价业务知识
|
2. 业务问题 -- 涉及电力造价专业知识、计价规则或造价数据计算等。
|
||||||
3. 安装下载注册 -- 指涉及软件(或插件)安装下载、注册、激活等操作类问题。
|
3. 安装下载注册 -- 及软件或插件的安装、下载、注册、激活、文件扩展名、软件锁等。
|
||||||
4. 固定话术类 -- 指涉及需要固定话术回答的问题,如:规费咨询、调差下载更新。
|
4. 固定话术类 -- 需用固定话术回复的问题,如:规费咨询、调差下载更新。
|
||||||
5. 其他 -- 指与软件或电力造价专业无关的日常对话、问候、感慨、情绪表达等。
|
5. 其他 -- 与软件或电力造价无关的一般对话(问候、情绪等)。
|
||||||
|
|
||||||
【软件问题包括以下两类】:
|
【软件问题包括以下两类】:
|
||||||
1. 软件功能:询问软件功能的使用、功能操作(调整)、功能位置、如何设置、如何转换、如何导入到软件、如何安装到软件等侧重软件主体
|
1. 软件功能:询问软件如何使用、设置、导入、在软件中安装/转换等(以软件为主体)。
|
||||||
示例:ywlk怎么安装到软件中? ywlk是文件后缀名 ---> 将文件导入到软件中
|
示例:ywlk怎么安装到软件中? ywlk是文件后缀名 ---> 将文件导入到软件中
|
||||||
2. 故障排查:软件运行异常、软件报错、软件显示错误等
|
2. 故障排查:软件运行异常、软件报错、软件显示错误等
|
||||||
|
|
||||||
@@ -56,188 +56,90 @@ classification_info="""【垂直领域分类】:
|
|||||||
4. 问题排查类:软件安装下载失败、报错,系统兼容性问题等
|
4. 问题排查类:软件安装下载失败、报错,系统兼容性问题等
|
||||||
|
|
||||||
【固定话术类包括以下类】:
|
【固定话术类包括以下类】:
|
||||||
|
|
||||||
1. 规费咨询
|
1. 规费咨询
|
||||||
**以下两种情况才属于该类**
|
**以下两种情况才属于该类**
|
||||||
1、当询问规费(如社会保障费和住房公积金)费率是/填多少
|
1、当询问规费(如社会保障费和住房公积金)费率是/填多少
|
||||||
2、去哪里获取规费费率
|
2、去哪里获取规费费率
|
||||||
**其余涉及规费的属于其他垂直领域分类**
|
**其余涉及规费的属于其他垂直领域分类**
|
||||||
|
|
||||||
2. 调差下载更新
|
2. 调差下载更新
|
||||||
**以下两种情况才属于该类**
|
**以下两种情况才属于该类**
|
||||||
1、询问如何下载导入调差文件、调差插件
|
1、询问如何下载导入调差文件、调差插件
|
||||||
2、询问如何更新导入调差文件、调差插件
|
2、询问如何更新导入调差文件、调差插件
|
||||||
调差:价格水平差异调整
|
调差 = 价格水平差异调整
|
||||||
**其余涉及调差的属于其他垂直领域分类**
|
**其余涉及调差的属于其他垂直领域分类**
|
||||||
|
|
||||||
【其他】:
|
【其他】:
|
||||||
1. 其他
|
1. 其他
|
||||||
|
|
||||||
分类优先级:
|
分类优先级:固定话术类 > 软件问题 、 业务问题 、 安装下载注册 > 其他"""
|
||||||
固定话术类 > 软件问题 、 业务问题 、 安装下载注册 > 其他
|
|
||||||
"""
|
|
||||||
|
|
||||||
classification_prompt="""
|
classification_prompt="""用户在使用电力造价软件或咨询电力造价相关问题,请将用户输入(结合历史对话,如有)归为以下垂直领域之一:
|
||||||
用户正在使用电力造价软件或想询问电力造价领域相关知识,你需要根据用户的输入内容集合历史对话(如果存在),将其归类为以下垂直领域之一:
|
{classification_info}
|
||||||
{classification_info}
|
|
||||||
|
|
||||||
## 【历史对话记录】
|
## 【历史对话记录】
|
||||||
{chat_history}
|
{chat_history}
|
||||||
|
|
||||||
【用户输入】:
|
【用户输入】:
|
||||||
{user_input}
|
{user_input}
|
||||||
|
|
||||||
【输出格式要求】:
|
【输出格式要求】:
|
||||||
{output_format}
|
|
||||||
|
|
||||||
【示例】
|
|
||||||
用户输入1: 技改T1怎样新建工程
|
|
||||||
输出1:
|
|
||||||
{{
|
|
||||||
"vertical_classification":"软件问题",
|
|
||||||
"sub_classification":"软件功能"
|
|
||||||
}}
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
query_rewrite_prompt = """
|
|
||||||
|
|
||||||
# 电力造价专业问答优化工程师
|
|
||||||
|
|
||||||
你是一名电力造价专业问答优化工程师,负责通过专业关键词集合替换原始问题中的非专业表述以提升知识库检索准确率。
|
|
||||||
|
|
||||||
## 核心任务
|
|
||||||
将用户的原始问题结合专业术语库进行规范化重构,提高知识库检索的准确性和专业性。
|
|
||||||
|
|
||||||
## 处理流程
|
|
||||||
### 第一阶段:输入解析
|
|
||||||
1. 解析基础信息
|
|
||||||
- 原始问题(需保留核心语义):{query}
|
|
||||||
- 关键词集合:{keywords}
|
|
||||||
|
|
||||||
### 第二阶段:匹配分析
|
|
||||||
**匹配规则:**
|
|
||||||
1. 检查原始问题中是否包含关键词集合中的`name`字段或`synonymous`字段中的任何词汇
|
|
||||||
2. 统计匹配的术语数量
|
|
||||||
3. 判断执行路径:
|
|
||||||
- 匹配术语 ≥ 1个 → 执行重构流程
|
|
||||||
- 匹配术语 = 0个 → 直接输出原始问题
|
|
||||||
|
|
||||||
### 第三阶段:问题重构
|
|
||||||
**重构原则(按优先级排序):**
|
|
||||||
|
|
||||||
1. **语义保真**:严格保持原问题的核心意图和诉求
|
|
||||||
2. **术语规范**:
|
|
||||||
- 将匹配到的同义词替换为对应的标准术语(name字段)
|
|
||||||
- 对在关键词中的标准术语使用【】进行标记
|
|
||||||
- 保留在原问题中未在关键词库中的专业术语、限定词和修饰词
|
|
||||||
3. **结构优化**:
|
|
||||||
- 保持原问题的语态特征5W2H
|
|
||||||
- 保持主谓宾结构清晰
|
|
||||||
- 保留时间、版本等限定条件
|
|
||||||
|
|
||||||
**术语处理规则:**
|
|
||||||
- 优先级1:保留原问题中的专业术语、限定词和修饰词(即使不在关键词库中)
|
|
||||||
- 优先级2:将同义词替换为标准术语并用【】标记
|
|
||||||
- 优先级3:对原问题中已存在的标准术语添加【】标记
|
|
||||||
|
|
||||||
# 输出规范
|
|
||||||
{output_format}
|
{output_format}
|
||||||
|
|
||||||
# 示范案例库
|
【示例】
|
||||||
▶ 案例1(有效匹配)
|
用户输入1: 技改T1怎样新建工程
|
||||||
输入:
|
输出1:
|
||||||
原始问题:怎么把旧版西藏定额工程转到Z1新版
|
{{
|
||||||
关键词:【'老版本定额升级', '批量设置定额', '西藏造价软件Z1'】
|
"vertical_classification":"软件问题",
|
||||||
输出:
|
"sub_classification":"软件功能"
|
||||||
{{"rewrite":"【西藏造价软件Z1】如何执行【老版本定额升级】操作?"}}
|
}}"""
|
||||||
|
|
||||||
▶ 案例2(无效匹配)
|
query_rewrite_prompt_pro="""# 问答优化工程师
|
||||||
输入:
|
|
||||||
原始问题:程序界面文字显示过小如何处理?
|
|
||||||
关键词:【'定额升级', '工程批量导入'】
|
|
||||||
输出:
|
|
||||||
{{"rewrite":"程序界面文字显示过小如何处理?"}}
|
|
||||||
|
|
||||||
▶ 案例3(部分匹配,但保留修饰限定词)
|
|
||||||
输入:
|
|
||||||
原始问题:"配网软件D3能导出清单的计算公式吗?
|
|
||||||
关键词:【'配网工程计价通D3软件', '计算式'】
|
|
||||||
输出(保留限定修饰词"清单"):
|
|
||||||
{{"rewrite":"【配网工程计价通D3软件】能导出清单的【计算式】吗?"}}
|
|
||||||
|
|
||||||
## 质量检查清单
|
|
||||||
执行前请确认:
|
|
||||||
- [ ] 是否保持了原问题的核心诉求?
|
|
||||||
- [ ] 是否正确执行了同义词替换?
|
|
||||||
- [ ] 是否保留了原问题中的专业术语和限定条件?
|
|
||||||
- [ ] 是否正确使用了【】标记?
|
|
||||||
- [ ] 重构后的问题是否自然流畅?
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
query_rewrite_prompt_pro="""
|
|
||||||
# 问答优化工程师
|
|
||||||
**角色**:基于历史对话和术语库重构问题,提升知识库检索准确率。
|
**角色**:基于历史对话和术语库重构问题,提升知识库检索准确率。
|
||||||
**最高准则**:
|
**最高准则**:
|
||||||
1、保持问题核心意图,允许指代消除
|
1、保持问题核心意图,允许指代消除
|
||||||
2. 所有新增内容必须源于历史对话或聊天背景,禁止捏造。
|
2. 所有新增内容必须源于历史对话或聊天背景,禁止捏造。
|
||||||
3. 归一化替换需严格全词匹配:查询中的词必须与术语库同义词完全一致(不区分大小写)。部分匹配(如子字符串)或不匹配,保留原词
|
3. 归一化替换需严格全词匹配:查询中的词必须与术语库同义词完全一致(不区分大小写)。部分匹配(如子字符串)或不匹配,保留原词
|
||||||
|
|
||||||
|
|
||||||
## 核心原则
|
## 核心原则
|
||||||
1. **指代消除 → 当指示代词("那"/"这")出现时,强制继承历史对话的最新核心主题(如功能或任务),并应用到当前主体。**
|
1. **指代消除 → 当指示代词("那"/"这")出现时,继承历史对话的最新核心主题(如功能或任务),并应用到当前主体。**
|
||||||
2. 术语规范 → 提问中出现的同义词(synonymous)替换为标准词(name)并【】标记
|
2. 术语规范 → 提问中出现的同义词(synonymous)替换为标准词(name)并【】标记
|
||||||
3. 语义保真 → 保持问题核心意图,允许指代消除
|
3. 语义保真 → 保持问题核心意图,允许指代消除
|
||||||
|
|
||||||
## 归一化替换规则
|
## 归一化替换规则
|
||||||
1. 只有当问题中的词与术语库中某一项的同义词列表中的某个词完全相同时,才替换为对应的标准词
|
1. 只有当问题中的词与术语库中某一项的同义词列表中的某个词完全相同时,才替换为对应的标准词
|
||||||
|
|
||||||
|
|
||||||
## 处理流程
|
## 处理流程
|
||||||
### 一、输入解析
|
### 一、输入解析
|
||||||
- 原始问题(需保留核心语义):
|
- 原始问题(需保留核心语义):
|
||||||
<query> {query} </query>
|
<query> {query} </query>
|
||||||
|
|
||||||
- 术语库集合(用于同义词转标准词环节):
|
- 术语库集合(用于同义词转标准词环节):
|
||||||
<keywords>
|
<keywords>
|
||||||
{keywords}
|
{keywords}
|
||||||
</keywords>
|
</keywords>
|
||||||
|
|
||||||
- 历史对话记录:
|
- 历史对话记录:
|
||||||
<history>
|
<history>
|
||||||
{chat_history}
|
{chat_history}
|
||||||
</history>
|
</history>
|
||||||
|
|
||||||
### 一、重构流程
|
### 二、重构流程
|
||||||
1、问题是否指代不明,指代不明时根据历史对话补充上下文
|
1、问题是否指代不明,指代不明时根据历史对话补充上下文
|
||||||
2、问题是否包含同义词,包含同义词时进行同义词转标准词
|
2、问题是否包含同义词,包含同义词时进行同义词转标准词
|
||||||
|
|
||||||
### 三、重构优先级
|
### 三、重构优先级
|
||||||
1. **指代消除 → 当指示代词出现时,结合历史对话补充上下文**
|
1. **指代消除 → 当指示代词出现时,结合历史对话补充上下文**
|
||||||
2. 同义词转标准词 → 将提问中出现的同义词(synonymous)替换为对应标准词(name) 并使用【】标记
|
2. 同义词转标准词 → 将提问中出现的同义词(synonymous)替换为对应标准词(name) 并使用【】标记
|
||||||
3. 结构优化 → 保持原问题的5W2H特征,指代消除、背景继承下允许微调意图。
|
3. 结构优化 → 指代消除、背景继承下允许微调提问。
|
||||||
|
|
||||||
## 输出规范
|
## 输出规范
|
||||||
{output_format}
|
{output_format}
|
||||||
|
|
||||||
## 示例模仿
|
|
||||||
示例1:
|
|
||||||
输入:
|
|
||||||
<history>
|
|
||||||
'user': '811623110668是哪款软件的锁?
|
|
||||||
'assistant': 可通过查询软件锁的许可证信息,通过许可证名称可以判断对应软件
|
|
||||||
</history>
|
|
||||||
<query> ”锂离子电池储能安装“ </query>
|
|
||||||
输出:
|
|
||||||
{{"rewrite": "许可证名称为‘锂离子电池储能安装’对应什么软件?"}}
|
|
||||||
|
|
||||||
## 质量自检
|
## 质量自检
|
||||||
- [] **主题是否合理继承?**
|
- [] **主题是否合理继承?**
|
||||||
- [] 核心诉求是否保留?
|
- [] 核心诉求是否保留?
|
||||||
- [] 语句是否自然流畅?
|
- [] 语句是否自然流畅?
|
||||||
- [] 避免补充无关信息
|
- [] 避免补充无关信息"""
|
||||||
"""
|
|
||||||
|
|
||||||
slot_filling_prompt = """
|
slot_filling_prompt = """
|
||||||
你是一个专业的电力造价领域问题槽位填充助手。你需要从用户问题中提取关键信息,并填充到对应的数据结构中。
|
你是一个专业的电力造价领域问题槽位填充助手。你需要从用户问题中提取关键信息,并填充到对应的数据结构中。
|
||||||
@@ -282,21 +184,18 @@ slot_filling_prompt = """
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# 意图优化环节提示词模板
|
# 意图优化环节提示词模板
|
||||||
step_back_prompt = """
|
step_back_prompt = """# 后退提示生成器
|
||||||
# 后退提示生成器
|
你是电力造价领域的问题抽象专家。任务是把用户的具体问题抽象成更高层次的问题,帮助理解其核心意图。
|
||||||
|
|
||||||
你是一个专业的电力造价领域问题抽象专家。你的任务是根据用户的具体问题,提出一个更抽象、更高层次的问题,帮助系统更好地理解用户的意图。
|
|
||||||
|
|
||||||
## 任务说明
|
## 任务说明
|
||||||
1. 分析用户的原始问题,理解其核心意图和需求
|
1. 理解用户原始问题的核心
|
||||||
2. 考虑历史对话和会话背景,理解用户当前问题的上下文
|
2. 考虑上下文
|
||||||
3. 生成更抽象、更高层次的问题,称为"后退问题",后退问题可以生成多个,依次后退到更抽象、更高层次的问题
|
3. 生成逐级抽象的“后退问题”
|
||||||
4. 后退问题应该:
|
4. 后退问题应该:
|
||||||
- 更加通用和抽象,不应包含原始问题的具体细节(包括场景限定、界面限定等其他限定词语)
|
- 更加通用和抽象,不应包含原始问题的具体细节(包括场景限定、界面限定等其他限定词语)
|
||||||
- 涵盖原始问题的核心主题
|
- 涵盖原始问题的核心主题
|
||||||
- 去除过于具体的限制条件(如时间、地点、特定版本、特定工程等)
|
- 去除过于具体的限制条件(如时间、地点、特定版本、特定工程等)
|
||||||
- 保持在同一领域和主题范围内
|
- 保持在同一领域和主题范围内
|
||||||
- 依次移除问题中的限定词或者修饰词
|
- 依次移除问题中的限定词或者修饰词
|
||||||
|
|
||||||
## 输入
|
## 输入
|
||||||
用户原始问题: {query}
|
用户原始问题: {query}
|
||||||
@@ -306,23 +205,13 @@ step_back_prompt = """
|
|||||||
{output_format}
|
{output_format}
|
||||||
|
|
||||||
## 示例
|
## 示例
|
||||||
原始问题: "2023版本如何在Windows 11系统上导入单位工程量清单?"
|
|
||||||
后退问题:
|
|
||||||
{{
|
|
||||||
"original_query": "2023版本如何在Windows 11系统上导入单位工程量清单?",
|
|
||||||
"can_use_back_prompt": true,
|
|
||||||
"step_back_query": ["如何在Windows 11系统上导入单位工程量清单?", "如何导入单位工程量清单?"]
|
|
||||||
}}
|
|
||||||
|
|
||||||
原始问题: "某个设备更换后,如何在系统中更新对应的定额?"
|
原始问题: "某个设备更换后,如何在系统中更新对应的定额?"
|
||||||
后退问题:
|
后退问题:
|
||||||
{{
|
{{
|
||||||
"original_query": "某个设备更换后,如何在系统中更新对应的定额?",
|
"original_query": "某个设备更换后,如何在系统中更新对应的定额?",
|
||||||
"can_use_back_prompt": true,
|
"can_use_back_prompt": true,
|
||||||
"step_back_query": ["如何更新设备对应的定额?", "如何更新定额?"]
|
"step_back_query": ["如何更新设备对应的定额?", "如何更新定额?"]
|
||||||
}}
|
}}"""
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
follow_up_questions_prompt = """
|
follow_up_questions_prompt = """
|
||||||
# 后续问题生成器
|
# 后续问题生成器
|
||||||
|
|||||||
+15
-33
@@ -193,7 +193,9 @@ class OpenAiLLM:
|
|||||||
messages=[{'role': 'user', 'content': user_prompt}],
|
messages=[{'role': 'user', 'content': user_prompt}],
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
return completion.choices[0].message
|
message = completion.choices[0].message
|
||||||
|
message.usage = completion.usage
|
||||||
|
return message
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise RuntimeError(f"OpenAiLLM:invoke:error:{str(e)}") from e
|
raise RuntimeError(f"OpenAiLLM:invoke:error:{str(e)}") from e
|
||||||
|
|
||||||
@@ -225,36 +227,16 @@ class OpenAiLLM:
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# 测试重排模型
|
# 测试重排模型
|
||||||
reranker = SiliconFlowReRankerModel()
|
base_url = os.getenv("OPENAI_API_BASE")
|
||||||
|
model_name = os.getenv("MODEL_NAME", "gpt-3.5-turbo")
|
||||||
# 测试用例1:简单问题
|
# 初始化LLM
|
||||||
query = "如何通过【电力经济评价软件】的【打开】功能加载工程文件?"
|
llm_params = {
|
||||||
documents = []
|
"temperature": 0.4, # 降低随机性,使结果更确定
|
||||||
results = reranker.rerank(query, documents)
|
"top_p": 0.7,
|
||||||
print(f"测试用例1 - 查询:{query}")
|
"model": model_name,
|
||||||
for idx, item in enumerate(results):
|
"base_url": base_url
|
||||||
print(f"{idx+1}. 文档: {item['document']}, 分数: {item['score']}")
|
}
|
||||||
print("-" * 50)
|
|
||||||
|
|
||||||
# 异步测试示例
|
|
||||||
async def test_async():
|
|
||||||
# 测试异步嵌入
|
|
||||||
api_key = APIKeyManager.get_api_key()
|
|
||||||
embeddings = XinferenceEmbeddings(api_key=api_key)
|
|
||||||
query_embedding = await embeddings.embed_query_async("测试查询")
|
|
||||||
print(f"异步嵌入向量维度: {len(query_embedding)}")
|
|
||||||
|
|
||||||
# 测试异步重排序
|
|
||||||
results = await SiliconFlowReRankerModel.rerank_async(query, documents)
|
|
||||||
print(f"异步重排序结果数量: {len(results)}")
|
|
||||||
|
|
||||||
# 测试异步LLM调用
|
|
||||||
llm = OpenAiLLM()
|
|
||||||
response = await llm.ainvoke("你好,请简单介绍一下自己")
|
|
||||||
print(f"异步LLM响应: {response.content}")
|
|
||||||
|
|
||||||
# 如果需要运行异步测试,取消下面的注释
|
|
||||||
# import asyncio
|
|
||||||
# asyncio.run(test_async())
|
|
||||||
|
|
||||||
|
|
||||||
|
_llm = OpenAiLLM(**llm_params)
|
||||||
|
promt="""你好,请简单介绍一下自己"""
|
||||||
|
print(_llm.invoke(promt))
|
||||||
Executable
+111
@@ -0,0 +1,111 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# 专用脚本:启动 rag2_0.api.intent_recognition_api 服务
|
||||||
|
# 功能:启动前检测screen是否存在,存在则结束,最后启动服务
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
SESSION_NAME="intent_recognition_api"
|
||||||
|
SERVICE_PORT="8001"
|
||||||
|
START_COMMAND="cd \"$SCRIPT_DIR\" && uv run uvicorn rag2_0.api.intent_recognition_api:app --host 0.0.0.0 --port 8001 --workers 4"
|
||||||
|
|
||||||
|
echo "[脚本] 启动 intent_recognition_api 服务..."
|
||||||
|
|
||||||
|
# 检查screen会话是否存在
|
||||||
|
exists_session() {
|
||||||
|
# 使用严格匹配,避免误判
|
||||||
|
if screen -ls 2>/dev/null | grep -q "\\.${SESSION_NAME}\\s"; then
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# 按端口获取监听该端口的任意一个PID,优先用 ss,其次 lsof
|
||||||
|
pids_on_port() {
|
||||||
|
# 从 ss 提取 pid 列表
|
||||||
|
local ss_pids
|
||||||
|
ss_pids=$(ss -lptn 2>/dev/null \
|
||||||
|
| grep -E ":${SERVICE_PORT}\\b" \
|
||||||
|
| awk '{print $NF}' \
|
||||||
|
| sed -n 's/.*pid=\([0-9]\+\),.*/\1/p' \
|
||||||
|
| sort -u)
|
||||||
|
if [[ -n "$ss_pids" ]]; then
|
||||||
|
echo "$ss_pids"
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
# 从 lsof 提取 pid 列表
|
||||||
|
if command -v lsof >/dev/null 2>&1; then
|
||||||
|
local lsof_pids
|
||||||
|
lsof_pids=$(lsof -nP -i :"$SERVICE_PORT" -sTCP:LISTEN -t 2>/dev/null | sort -u)
|
||||||
|
if [[ -n "$lsof_pids" ]]; then
|
||||||
|
echo "$lsof_pids"
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# 根据端口优雅终止(TERM)并在必要时强制(KILL)清理进程
|
||||||
|
kill_by_port() {
|
||||||
|
local pids
|
||||||
|
pids=$(pids_on_port || true)
|
||||||
|
if [[ -z "$pids" ]]; then
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
echo "[清理] 端口 $SERVICE_PORT 仍被占用,发送 SIGTERM 到: $pids"
|
||||||
|
kill -TERM $pids 2>/dev/null || true
|
||||||
|
sleep 2
|
||||||
|
# 再次检查
|
||||||
|
local left
|
||||||
|
left=$(pids_on_port || true)
|
||||||
|
if [[ -n "$left" ]]; then
|
||||||
|
echo "[强制] 端口 $SERVICE_PORT 仍占用,发送 SIGKILL 到: $left"
|
||||||
|
kill -KILL $left 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
# 停止已存在的服务
|
||||||
|
stop_existing_service() {
|
||||||
|
# 1) 先尝试关闭 screen 会话
|
||||||
|
if exists_session "$SESSION_NAME"; then
|
||||||
|
echo "[停止] 发现已存在的 screen 会话 '$SESSION_NAME',正在结束..."
|
||||||
|
screen -S "$SESSION_NAME" -X quit || true
|
||||||
|
echo "[停止] screen 会话 '$SESSION_NAME' 已结束"
|
||||||
|
else
|
||||||
|
echo "[提示] 未发现 screen 会话: $SESSION_NAME"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 2) 等待释放端口
|
||||||
|
sleep 2
|
||||||
|
|
||||||
|
# 3) 如果仍占用,按端口清理
|
||||||
|
if ss -lptn 2>/dev/null | grep -E -q ":${SERVICE_PORT}\\b" || (command -v lsof >/dev/null 2>&1 && lsof -i :"$SERVICE_PORT" -sTCP:LISTEN >/dev/null 2>&1); then
|
||||||
|
echo "[清理] 端口 $SERVICE_PORT 仍被占用,正在清理..."
|
||||||
|
kill_by_port
|
||||||
|
echo "[清理] 端口 $SERVICE_PORT 清理完成"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
# 启动服务
|
||||||
|
start_new_service() {
|
||||||
|
echo "[启动] 准备启动 intent_recognition_api 服务..."
|
||||||
|
echo "[启动] 启动命令: $START_COMMAND"
|
||||||
|
screen -dmS "$SESSION_NAME" bash -c "$START_COMMAND"
|
||||||
|
echo "[启动] intent_recognition_api 服务已启动,screen 会话名: '$SESSION_NAME'"
|
||||||
|
echo "[启动] 服务运行在端口: $SERVICE_PORT"
|
||||||
|
echo "[提示] 可使用 'screen -r $SESSION_NAME' 查看服务输出"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 主流程
|
||||||
|
main() {
|
||||||
|
# 1. 停止已存在的服务
|
||||||
|
stop_existing_service
|
||||||
|
|
||||||
|
# 2. 启动新服务
|
||||||
|
start_new_service
|
||||||
|
|
||||||
|
echo "[完成] intent_recognition_api 服务启动脚本执行完成"
|
||||||
|
}
|
||||||
|
|
||||||
|
main
|
||||||
Reference in New Issue
Block a user