Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 603c8122d4 | |||
| 500c8c166c |
@@ -3034,11 +3034,6 @@
|
||||
"synonymous": [],
|
||||
"description": "VC运行时库的安装程序"
|
||||
},
|
||||
{
|
||||
"name": "技改检修计价通T1",
|
||||
"synonymous": [],
|
||||
"description": "电力行业技改检修计价软件的名称"
|
||||
},
|
||||
{
|
||||
"name": "博微电力建设计价通",
|
||||
"synonymous": [],
|
||||
@@ -9885,11 +9880,6 @@
|
||||
],
|
||||
"description": "报表参数配置项,控制材料/设备的显示筛选"
|
||||
},
|
||||
{
|
||||
"name": "技改检修清单T1",
|
||||
"synonymous": [],
|
||||
"description": "软件版本标识,对应技术改造和检修工程清单类型"
|
||||
},
|
||||
{
|
||||
"name": "导入EXCEL版物料库",
|
||||
"synonymous": [],
|
||||
@@ -10023,7 +10013,8 @@
|
||||
{
|
||||
"name": "技改检修清单计价通T1软件",
|
||||
"synonymous": [
|
||||
"技改检修清单软件"
|
||||
"技改检修清单软件",
|
||||
"技改检修清单T1"
|
||||
],
|
||||
"description": "电力行业用于技改检修清单计价的软件名称"
|
||||
},
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -1,618 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
完整性问题判断工具
|
||||
|
||||
此脚本用于读取Excel文件中的问题,调用LLM判断问题是否完整,并将结果保存到Excel文件中。
|
||||
|
||||
用法示例:
|
||||
python judge_query_full.py -i "问题数据.xlsx" -o "完整问题结果.xlsx" -w 50 -c 0
|
||||
|
||||
命令行参数:
|
||||
-i, --input: 输入Excel文件路径
|
||||
-o, --output: 输出Excel文件路径
|
||||
-w, --workers: 并发处理的最大线程数
|
||||
-c, --column: 要处理的问题所在列的索引(从0开始)
|
||||
-t, --test: 测试单个问题,不处理Excel文件
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import argparse
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||
from rag2_0.tool.APIKeyManager import APIKeyManager
|
||||
from openpyxl.utils import get_column_letter
|
||||
from openpyxl.styles import Alignment, PatternFill, Font, Border, Side
|
||||
from tqdm import tqdm
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
# 默认设置
|
||||
DEFAULT_EXCEL_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据.xlsx"
|
||||
DEFAULT_OUTPUT_PATH = r"/data/QueryRewrite/data/excel/7000条对话数据_完整问题结果.xlsx"
|
||||
DEFAULT_MAX_WORKERS = 50
|
||||
|
||||
|
||||
class QueryCompletenessJudge:
|
||||
"""
|
||||
问题完整性判断工具类
|
||||
|
||||
用于评估问题是否完整,并将结果保存到Excel文件中。
|
||||
可以批量处理Excel文件中的问题,也可以测试单个问题。
|
||||
"""
|
||||
|
||||
def __init__(self, input_path=DEFAULT_EXCEL_PATH, output_path=DEFAULT_OUTPUT_PATH,
|
||||
max_workers=DEFAULT_MAX_WORKERS, column_index=0):
|
||||
"""
|
||||
初始化问题完整性判断工具
|
||||
|
||||
参数:
|
||||
input_path (str): 输入Excel文件路径
|
||||
output_path (str): 输出Excel文件路径
|
||||
max_workers (int): 并发处理的最大线程数
|
||||
column_index (int): 要处理的问题所在列的索引(从0开始)
|
||||
"""
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.max_workers = max_workers
|
||||
self.column_index = column_index
|
||||
self.llm_client = self._create_llm_client()
|
||||
|
||||
def _extract_json_from_response(self, full_answer):
|
||||
"""
|
||||
从LLM响应中提取JSON部分
|
||||
|
||||
参数:
|
||||
full_answer (str): LLM的完整响应文本
|
||||
|
||||
返回:
|
||||
dict: 解析后的JSON对象,如果解析失败则返回None
|
||||
"""
|
||||
# 尝试从回答中提取JSON部分
|
||||
json_match = re.search(r'```json\s*(.*?)\s*```', full_answer, re.DOTALL)
|
||||
if json_match:
|
||||
json_str = json_match.group(1)
|
||||
else:
|
||||
# 如果没有找到```json```格式,尝试寻找普通的JSON对象
|
||||
json_match = re.search(r'({[\s\S]*"is_complete"[\s\S]*})', full_answer)
|
||||
if json_match:
|
||||
json_str = json_match.group(1)
|
||||
else:
|
||||
# 如果仍然没有找到,返回None
|
||||
return None
|
||||
|
||||
try:
|
||||
# 解析JSON
|
||||
return json.loads(json_str)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
def _create_llm_prompt(self, question):
|
||||
"""
|
||||
创建LLM提示词
|
||||
|
||||
参数:
|
||||
question (str): 需要判断完整性的问题
|
||||
|
||||
返回:
|
||||
str: 格式化后的提示词
|
||||
"""
|
||||
return f"""你是一个电力造价行业专家,用户正在使用电力造价软件,并提出了相关问题。请分析以下问题是否完整。
|
||||
|
||||
问题:{question}
|
||||
|
||||
首先,分析这个问题的结构和内容,思考它是否包含足够的信息来表达清晰的意图。
|
||||
考虑以下几点:
|
||||
1. 问题是否有明确的核心意图,不需要面面俱到
|
||||
2. 问题是否缺少必要的上下文
|
||||
3. **问题如果涉及软件相关,则只需要包含:软件名称、软件功能或软件目的即可**
|
||||
|
||||
|
||||
在你的分析之后,请用JSON格式给出最终结论,格式如下:
|
||||
```json
|
||||
{{
|
||||
"is_complete": true或false,
|
||||
"reason": "判断原因的简要说明",
|
||||
"confidence": 0到100之间的数值,表示你对判断的置信度
|
||||
}}
|
||||
```
|
||||
|
||||
请确保JSON格式正确,以便于程序解析。"""
|
||||
|
||||
def _create_llm_client(self, api_key=None):
|
||||
"""
|
||||
创建LLM客户端
|
||||
|
||||
参数:
|
||||
api_key (str, optional): API密钥,如果为None则从APIKeyManager获取
|
||||
|
||||
返回:
|
||||
OpenAiLLM: LLM客户端实例
|
||||
"""
|
||||
if api_key is None:
|
||||
api_key = APIKeyManager.get_api_key()
|
||||
|
||||
return OpenAiLLM(
|
||||
api_key=api_key,
|
||||
base_url="https://api.siliconflow.cn/v1", # 可以根据实际情况修改
|
||||
model="deepseek-ai/DeepSeek-V3", # 可以根据实际情况修改
|
||||
temperature=0.2,
|
||||
max_tokens=100
|
||||
)
|
||||
|
||||
def is_question_complete(self, question):
|
||||
"""
|
||||
调用LLM判断问题是否完整
|
||||
|
||||
参数:
|
||||
question (str): 需要判断的问题
|
||||
|
||||
返回:
|
||||
tuple: (bool, str) - 是否完整的布尔值和LLM的详细回复
|
||||
"""
|
||||
# 最大重试次数
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
retry_delay = 2 # 重试延迟,单位:秒
|
||||
|
||||
while retry_count <= max_retries:
|
||||
try:
|
||||
# 创建提示词
|
||||
prompt = self._create_llm_prompt(question)
|
||||
|
||||
# 使用OpenAiLLM调用模型
|
||||
response = self.llm_client.invoke(prompt)
|
||||
|
||||
# 处理可能的响应格式
|
||||
if hasattr(response, 'content'):
|
||||
full_answer = response.content
|
||||
else:
|
||||
# 如果response是字符串
|
||||
full_answer = str(response)
|
||||
|
||||
# 提取JSON部分
|
||||
result = self._extract_json_from_response(full_answer)
|
||||
|
||||
if result:
|
||||
is_complete = result.get("is_complete", False)
|
||||
return is_complete, full_answer
|
||||
else:
|
||||
# 如果没有找到或解析失败,使用简单判断
|
||||
is_complete = "完整" in full_answer[:100]
|
||||
return is_complete, full_answer
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count <= max_retries:
|
||||
# 非最后一次重试,打印错误并继续
|
||||
time.sleep(retry_delay)
|
||||
# 每次重试增加延迟时间,避免频繁失败
|
||||
retry_delay *= 2
|
||||
else:
|
||||
# 已达到最大重试次数,返回错误
|
||||
stack_trace = traceback.format_exc()
|
||||
print(f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}")
|
||||
print(f"堆栈跟踪信息:\n{stack_trace}")
|
||||
return False, f"错误: 经过 {max_retries} 次重试后仍然失败: {str(e)}\n堆栈摘要: {str(e).__class__.__name__}"
|
||||
|
||||
# 不应该到达这里,但为了代码完整性添加
|
||||
return False, "未知错误:重试机制逻辑错误"
|
||||
|
||||
def _process_question(self, args, complete_questions, progress_counter, progress_lock, complete_questions_lock, pbar):
|
||||
"""
|
||||
处理单个问题并更新进度
|
||||
|
||||
参数:
|
||||
args (tuple): 包含问题索引、问题内容、LLM客户端和总问题数的元组
|
||||
complete_questions (list): 存储完整问题的列表
|
||||
progress_counter (dict): 进度计数器
|
||||
progress_lock (threading.Lock): 进度锁
|
||||
complete_questions_lock (threading.Lock): 完整问题列表锁
|
||||
pbar (tqdm): 进度条对象
|
||||
"""
|
||||
index, question, llm_client, total_questions = args
|
||||
|
||||
# 跳过空问题
|
||||
if pd.isna(question) or question.strip() == "":
|
||||
with progress_lock:
|
||||
progress_counter["processed"] += 1
|
||||
pbar.update(1)
|
||||
return None
|
||||
|
||||
# 调用LLM判断问题是否完整
|
||||
is_complete, full_answer = self.is_question_complete(question)
|
||||
|
||||
if is_complete:
|
||||
# 从答案中提取JSON
|
||||
parsed_json = self._extract_json_from_response(full_answer)
|
||||
|
||||
if parsed_json:
|
||||
# 构造包含解析出的JSON信息的结果
|
||||
result = {
|
||||
"问题": question,
|
||||
"LLM回复": full_answer,
|
||||
"完整性": "完整" if parsed_json.get("is_complete", False) else "不完整",
|
||||
"原因": parsed_json.get("reason", "未提供"),
|
||||
"置信度": parsed_json.get("confidence", 0)
|
||||
}
|
||||
|
||||
# 更新计数
|
||||
with progress_lock:
|
||||
if result["完整性"] == "完整":
|
||||
progress_counter["complete"] += 1
|
||||
else:
|
||||
progress_counter["incomplete"] += 1
|
||||
else:
|
||||
# JSON解析失败,只保存原始回答
|
||||
result = {
|
||||
"问题": question,
|
||||
"LLM回复": full_answer,
|
||||
"完整性": "完整"
|
||||
}
|
||||
|
||||
# 更新计数
|
||||
with progress_lock:
|
||||
progress_counter["complete"] += 1
|
||||
|
||||
with complete_questions_lock:
|
||||
complete_questions.append(result)
|
||||
else:
|
||||
with progress_lock:
|
||||
progress_counter["incomplete"] += 1
|
||||
# 更新进度条
|
||||
with progress_lock:
|
||||
progress_counter["processed"] += 1
|
||||
# 更新进度条描述
|
||||
pbar.set_postfix(
|
||||
完整=progress_counter["complete"],
|
||||
不完整=progress_counter["incomplete"],
|
||||
完整率=f"{progress_counter['complete']/max(1, progress_counter['processed']):.1%}"
|
||||
)
|
||||
pbar.update(1)
|
||||
|
||||
def _shorten_response(self, response):
|
||||
"""
|
||||
截断LLM响应,提取重要信息
|
||||
|
||||
参数:
|
||||
response (str): 原始LLM响应
|
||||
|
||||
返回:
|
||||
str: 截断后的响应
|
||||
"""
|
||||
# 保留思考过程的前200个字符和JSON部分
|
||||
json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
|
||||
if json_match:
|
||||
json_part = json_match.group(0)
|
||||
prefix = response[:200] + "..." if len(response) > 200 else response
|
||||
return f"{prefix}\n\n{json_part}"
|
||||
return response[:500] + "..." if len(response) > 500 else response
|
||||
|
||||
def _prepare_excel_dataframe(self, complete_questions):
|
||||
"""
|
||||
将结果处理为DataFrame用于Excel输出
|
||||
|
||||
参数:
|
||||
complete_questions (list): 完整问题列表
|
||||
|
||||
返回:
|
||||
pandas.DataFrame: 处理后的DataFrame
|
||||
"""
|
||||
# 将结果列表转换为DataFrame
|
||||
result_df = pd.DataFrame(complete_questions)
|
||||
|
||||
# 处理LLM回复列,截取一定长度以避免Excel单元格过大
|
||||
if "LLM回复" in result_df.columns:
|
||||
result_df["LLM回复"] = result_df["LLM回复"].apply(self._shorten_response)
|
||||
|
||||
# 调整列的顺序,确保重要列在前面
|
||||
column_order = ["问题", "完整性", "置信度", "原因", "LLM回复"]
|
||||
# 过滤掉不存在的列
|
||||
column_order = [col for col in column_order if col in result_df.columns]
|
||||
# 确保所有剩余的列也被包含
|
||||
for col in result_df.columns:
|
||||
if col not in column_order:
|
||||
column_order.append(col)
|
||||
|
||||
# 重新排序列
|
||||
return result_df[column_order]
|
||||
|
||||
def _set_excel_column_widths(self, worksheet):
|
||||
"""
|
||||
设置Excel列宽
|
||||
|
||||
参数:
|
||||
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
|
||||
"""
|
||||
for col in range(1, worksheet.max_column + 1):
|
||||
col_letter = get_column_letter(col)
|
||||
column_name = worksheet[f"{col_letter}1"].value
|
||||
|
||||
if column_name == "问题":
|
||||
worksheet.column_dimensions[col_letter].width = 40
|
||||
elif column_name == "LLM回复":
|
||||
worksheet.column_dimensions[col_letter].width = 60
|
||||
elif column_name == "原因":
|
||||
worksheet.column_dimensions[col_letter].width = 30
|
||||
elif column_name == "完整性":
|
||||
worksheet.column_dimensions[col_letter].width = 10
|
||||
elif column_name == "置信度":
|
||||
worksheet.column_dimensions[col_letter].width = 10
|
||||
else:
|
||||
worksheet.column_dimensions[col_letter].width = 15
|
||||
|
||||
def _apply_excel_cell_styles(self, worksheet):
|
||||
"""
|
||||
应用单元格样式
|
||||
|
||||
参数:
|
||||
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
|
||||
|
||||
返回:
|
||||
openpyxl.styles.Border: 边框样式,用于统计信息
|
||||
"""
|
||||
# 定义样式
|
||||
header_fill = PatternFill(start_color="DDEBF7", end_color="DDEBF7", fill_type="solid")
|
||||
header_font = Font(bold=True)
|
||||
wrap_alignment = Alignment(wrap_text=True, vertical="top")
|
||||
border = Border(
|
||||
left=Side(style='thin'),
|
||||
right=Side(style='thin'),
|
||||
top=Side(style='thin'),
|
||||
bottom=Side(style='thin')
|
||||
)
|
||||
|
||||
# 应用样式到每个单元格
|
||||
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column):
|
||||
for cell in row:
|
||||
cell.alignment = wrap_alignment
|
||||
cell.border = border
|
||||
|
||||
# 为标题行应用特殊样式
|
||||
if cell.row == 1:
|
||||
cell.fill = header_fill
|
||||
cell.font = header_font
|
||||
|
||||
# 为完整性列应用条件格式
|
||||
if cell.row > 1: # 跳过标题行
|
||||
column_name = worksheet.cell(row=1, column=cell.column).value
|
||||
if column_name == "完整性":
|
||||
if cell.value == "完整":
|
||||
cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
||||
else:
|
||||
cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
||||
|
||||
return border # 返回边框样式以便在统计信息中重用
|
||||
|
||||
def _add_statistics_to_excel(self, worksheet, complete_questions, total_rows, total_questions, border):
|
||||
"""
|
||||
添加统计信息到Excel表格
|
||||
|
||||
参数:
|
||||
worksheet (openpyxl.worksheet.worksheet.Worksheet): Excel工作表
|
||||
complete_questions (list): 完整问题列表
|
||||
total_rows (int): 总行数
|
||||
total_questions (int): 总问题数
|
||||
border (openpyxl.styles.Border): 边框样式
|
||||
|
||||
返回:
|
||||
int: 完整问题数量
|
||||
"""
|
||||
# 计算统计数据
|
||||
complete_count = sum(1 for item in complete_questions if item.get("完整性") == "完整")
|
||||
incomplete_count = total_rows - complete_count
|
||||
|
||||
# 添加统计行
|
||||
worksheet.append([""]) # 空行
|
||||
|
||||
stat_row = worksheet.max_row + 1
|
||||
worksheet.cell(row=stat_row, column=1, value="统计信息")
|
||||
worksheet.cell(row=stat_row, column=1).font = Font(bold=True)
|
||||
|
||||
worksheet.cell(row=stat_row+1, column=1, value="总问题数")
|
||||
worksheet.cell(row=stat_row+1, column=2, value=total_rows)
|
||||
|
||||
worksheet.cell(row=stat_row+2, column=1, value="完整问题数")
|
||||
worksheet.cell(row=stat_row+2, column=2, value=complete_count)
|
||||
worksheet.cell(row=stat_row+2, column=2).fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
|
||||
|
||||
worksheet.cell(row=stat_row+3, column=1, value="不完整问题数")
|
||||
worksheet.cell(row=stat_row+3, column=2, value=incomplete_count)
|
||||
worksheet.cell(row=stat_row+3, column=2).fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
|
||||
|
||||
worksheet.cell(row=stat_row+4, column=1, value="完整问题比例")
|
||||
worksheet.cell(row=stat_row+4, column=2, value=f"{complete_count/total_rows:.2%}" if total_rows > 0 else "0%")
|
||||
|
||||
# 应用边框到统计行
|
||||
for r in range(stat_row, stat_row+5):
|
||||
for c in range(1, 3):
|
||||
worksheet.cell(row=r, column=c).border = border
|
||||
|
||||
return complete_count
|
||||
|
||||
def save_results_to_excel(self, complete_questions, total_questions):
|
||||
"""
|
||||
将结果保存到Excel文件
|
||||
|
||||
参数:
|
||||
complete_questions (list): 完整问题列表
|
||||
total_questions (int): 总问题数
|
||||
"""
|
||||
if not complete_questions:
|
||||
print(f"没有找到完整的问题。")
|
||||
return
|
||||
|
||||
# 准备数据
|
||||
result_df = self._prepare_excel_dataframe(complete_questions)
|
||||
total_rows = len(result_df)
|
||||
|
||||
# 保存到Excel文件
|
||||
result_df.to_excel(self.output_path, index=False, engine='openpyxl')
|
||||
|
||||
# 应用Excel样式
|
||||
from openpyxl import load_workbook
|
||||
wb = load_workbook(self.output_path)
|
||||
ws = wb.active
|
||||
|
||||
# 设置列宽
|
||||
self._set_excel_column_widths(ws)
|
||||
|
||||
# 应用单元格样式
|
||||
border = self._apply_excel_cell_styles(ws)
|
||||
|
||||
# 添加统计信息
|
||||
complete_count = self._add_statistics_to_excel(ws, complete_questions, total_rows, total_questions, border)
|
||||
|
||||
# 保存样式化的工作簿
|
||||
wb.save(self.output_path)
|
||||
|
||||
# 输出结果统计
|
||||
print(f"处理完成。共有{complete_count}/{total_questions}个完整问题被保存到 {self.output_path}")
|
||||
print(f"完整问题比例: {complete_count/total_questions:.2%}" if total_questions > 0 else "完整问题比例: 0%")
|
||||
|
||||
def process_excel_file(self):
|
||||
"""
|
||||
处理Excel文件中的问题
|
||||
|
||||
读取Excel文件,判断问题完整性,并将结果保存到输出Excel文件
|
||||
"""
|
||||
# 确保Excel文件存在
|
||||
if not os.path.exists(self.input_path):
|
||||
print(f"错误: 找不到Excel文件 '{self.input_path}'")
|
||||
return
|
||||
|
||||
# 读取Excel文件
|
||||
print(f"正在读取Excel文件: {self.input_path}")
|
||||
try:
|
||||
df = pd.read_excel(self.input_path)
|
||||
except Exception as e:
|
||||
print(f"读取Excel文件时出错: {e}")
|
||||
return
|
||||
|
||||
# 检查列数据
|
||||
if len(df.columns) <= self.column_index:
|
||||
print(f"错误: Excel文件没有足够的列,请求索引 {self.column_index},但只有 {len(df.columns)} 列")
|
||||
return
|
||||
|
||||
# 获取目标列名称
|
||||
target_col = df.columns[self.column_index]
|
||||
print(f"目标列名称: {target_col}")
|
||||
|
||||
# 准备存储完整问题的列表
|
||||
complete_questions = []
|
||||
total_questions = len(df)
|
||||
|
||||
print(f"总共有{total_questions}个问题需要判断")
|
||||
|
||||
# 用于线程安全的列表操作和进度计数
|
||||
complete_questions_lock = threading.Lock()
|
||||
progress_counter = {"processed": 0, "complete": 0, "incomplete": 0}
|
||||
progress_lock = threading.Lock()
|
||||
|
||||
# 准备问题列表
|
||||
questions = [(i, str(row[target_col]), self.llm_client, total_questions)
|
||||
for i, row in df.iterrows()]
|
||||
|
||||
# 记录开始时间
|
||||
start_time = time.time()
|
||||
|
||||
# 使用tqdm创建进度条
|
||||
print(f"开始处理问题,使用 {self.max_workers} 个并发线程...")
|
||||
with tqdm(total=total_questions, desc="处理问题", unit="问题") as pbar:
|
||||
# 使用线程池并发处理
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
# 提交所有任务
|
||||
futures = [executor.submit(
|
||||
self._process_question,
|
||||
args,
|
||||
complete_questions,
|
||||
progress_counter,
|
||||
progress_lock,
|
||||
complete_questions_lock,
|
||||
pbar
|
||||
) for args in questions]
|
||||
|
||||
# 等待所有任务完成
|
||||
concurrent.futures.wait(futures)
|
||||
|
||||
# 计算总处理时间
|
||||
processing_time = time.time() - start_time
|
||||
print(f"处理完成,耗时: {processing_time:.2f}秒,平均每问题: {processing_time/total_questions:.2f}秒")
|
||||
|
||||
# 将完整问题保存到Excel文件
|
||||
self.save_results_to_excel(complete_questions, total_questions)
|
||||
|
||||
def test_single_question(self, question):
|
||||
"""
|
||||
测试单个问题的完整性
|
||||
|
||||
参数:
|
||||
question (str): 要测试的问题
|
||||
"""
|
||||
print(f"问题: {question}")
|
||||
print("正在调用LLM判断问题是否完整...")
|
||||
|
||||
# 调用LLM判断问题是否完整
|
||||
is_complete, full_answer = self.is_question_complete(question)
|
||||
|
||||
# 从答案中提取JSON
|
||||
parsed_json = self._extract_json_from_response(full_answer)
|
||||
|
||||
print("\n==== LLM回复 ====")
|
||||
print(full_answer)
|
||||
print("================\n")
|
||||
|
||||
if parsed_json:
|
||||
print(f"判断结果: {'完整' if parsed_json.get('is_complete', False) else '不完整'}")
|
||||
print(f"判断原因: {parsed_json.get('reason', '未提供')}")
|
||||
print(f"置信度: {parsed_json.get('confidence', 0)}%")
|
||||
else:
|
||||
print(f"判断结果: {'完整' if is_complete else '不完整'} (简单判断)")
|
||||
print("无法从回复中提取JSON结构化数据")
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
"""解析命令行参数"""
|
||||
parser = argparse.ArgumentParser(description='判断Excel文件中的问题是否完整')
|
||||
parser.add_argument('-i', '--input', type=str, default=DEFAULT_EXCEL_PATH,
|
||||
help=f'输入Excel文件路径 (默认: {DEFAULT_EXCEL_PATH})')
|
||||
parser.add_argument('-o', '--output', type=str, default=DEFAULT_OUTPUT_PATH,
|
||||
help=f'输出Excel文件路径 (默认: {DEFAULT_OUTPUT_PATH})')
|
||||
parser.add_argument('-w', '--workers', type=int, default=DEFAULT_MAX_WORKERS,
|
||||
help=f'并发处理的最大线程数 (默认: {DEFAULT_MAX_WORKERS})')
|
||||
parser.add_argument('-c', '--column', type=int, default=0,
|
||||
help='要处理的问题所在列的索引 (默认: 0,即第一列)')
|
||||
parser.add_argument('-t', '--test', type=str,
|
||||
help='测试单个问题,不处理Excel文件')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
args = parse_arguments()
|
||||
|
||||
# 创建问题完整性判断工具实例
|
||||
judge = QueryCompletenessJudge(
|
||||
input_path=args.input,
|
||||
output_path=args.output,
|
||||
max_workers=args.workers,
|
||||
column_index=args.column
|
||||
)
|
||||
# 如果是测试单个问题
|
||||
if args.test:
|
||||
judge.test_single_question(args.test)
|
||||
return
|
||||
|
||||
# 处理Excel文件
|
||||
judge.process_excel_file()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
@@ -2,27 +2,39 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
from rag2_0.dify.dify_client import DifyClient
|
||||
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
|
||||
import sys
|
||||
import argparse
|
||||
from threading import Lock
|
||||
import pandas as pd
|
||||
# 使用线程池并发执行
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from tqdm import tqdm
|
||||
from rag2_0.dify.dify_tool import DifyTool
|
||||
import json
|
||||
from urllib.parse import unquote
|
||||
from rag2_0.tool.WikijsTool import WikijsTool
|
||||
from rag2_0.tool.html_to_md import convert_html_to_md
|
||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field
|
||||
from langchain.output_parsers import PydanticOutputParser
|
||||
from threading import Lock
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
sys.path.append(os.getcwd())
|
||||
from rag2_0.dify.dify_client import DifyClient
|
||||
from rag2_0.dify.dify_tool import NewWorkflowChat, OldWorkFlowChat
|
||||
from rag2_0.tool.WikijsTool import WikijsTool
|
||||
from rag2_0.tool.html_to_md import convert_html_to_md
|
||||
from rag2_0.tool.ModelTool import OpenAiLLM
|
||||
from rag2_0.dify.dify_tool import DifyTool
|
||||
|
||||
load_dotenv()
|
||||
|
||||
import logging
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
class ContentSource(BaseModel):
|
||||
score:int = Field(description="相关性分数")
|
||||
reason:str = Field(description="评分理由")
|
||||
@@ -32,8 +44,7 @@ class DifyComparisonTester:
|
||||
Dify新旧流程对比测试类,用于比较两个不同流程的问答效果并进行评判
|
||||
"""
|
||||
def __init__(self, excel_path:str, baseurl:str, new_workflow_api_key:str,
|
||||
old_workflow_api_key:str=None, wiki_excel_path:str=None,
|
||||
output_path:str=None, max_workers:int=1, mode:str="both"):
|
||||
old_workflow_api_key:str=None, output_path:str=None, max_workers:int=1, mode:str="both"):
|
||||
"""
|
||||
初始化对比测试器
|
||||
|
||||
@@ -42,7 +53,6 @@ class DifyComparisonTester:
|
||||
baseurl: Dify API的基础URL
|
||||
new_workflow_api_key: 新流程的API密钥
|
||||
old_workflow_api_key: 旧流程的API密钥,仅在mode="both"时需要
|
||||
wiki_excel_path: Wiki Excel文件路径,用于获取标准答案
|
||||
output_path: 输出Excel文件路径
|
||||
max_workers: 最大工作线程数
|
||||
mode: 测试模式,"new_only"表示仅测试新对话,"both"表示测试新老对话
|
||||
@@ -64,8 +74,8 @@ class DifyComparisonTester:
|
||||
self.results_lock = Lock()
|
||||
|
||||
# 读取Wiki Excel文件
|
||||
if wiki_excel_path and os.path.exists(wiki_excel_path):
|
||||
self.wiki_excel = pd.read_excel(wiki_excel_path)
|
||||
if excel_path and os.path.exists(excel_path):
|
||||
self.wiki_excel = pd.read_excel(excel_path)
|
||||
else:
|
||||
self.wiki_excel = None
|
||||
|
||||
@@ -78,13 +88,13 @@ class DifyComparisonTester:
|
||||
"""
|
||||
self.dify_tool.close_connection()
|
||||
|
||||
def get_llm(self):
|
||||
def get_llm(self, **kwargs):
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
base_url = os.getenv("OPENAI_API_BASE")
|
||||
model = os.getenv("LLM_MODEL_NAME")
|
||||
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model)
|
||||
return OpenAiLLM(api_key=api_key, base_url=base_url, model=model, **kwargs)
|
||||
|
||||
def find_wiki_link(self, query) -> str | None:
|
||||
def find_wiki_link(self, row) -> str | None:
|
||||
"""
|
||||
根据查询找出对应的词条链接
|
||||
|
||||
@@ -94,30 +104,11 @@ class DifyComparisonTester:
|
||||
Returns:
|
||||
str: 对应的词条链接,如果没有找到则返回None
|
||||
"""
|
||||
# 确保query不为空
|
||||
if not query or pd.isna(query):
|
||||
return None
|
||||
if self.wiki_excel is None:
|
||||
return None
|
||||
|
||||
# 在"新提问"列中查找匹配的行
|
||||
matched_rows = self.wiki_excel[self.wiki_excel['新提问'] == query]
|
||||
|
||||
# 如果找到了匹配的行,返回对应的词条链接
|
||||
if not matched_rows.empty:
|
||||
return matched_rows.iloc[0]['对应词条链接']
|
||||
|
||||
# 如果没有完全匹配,尝试部分匹配
|
||||
# 去除软件名称部分(如果有)
|
||||
query_parts = query.split(',', 1)
|
||||
if len(query_parts) > 1:
|
||||
clean_query = query_parts[1].strip()
|
||||
|
||||
# 在"提问"列中查找包含清理后查询的行
|
||||
for idx, row in self.wiki_excel.iterrows():
|
||||
if pd.notna(row['提问']) and clean_query in row['提问']:
|
||||
return row['对应词条链接']
|
||||
|
||||
if "词条链接" in row:
|
||||
return row["词条链接"]
|
||||
return None
|
||||
|
||||
def get_wiki_content(self, link) -> str:
|
||||
@@ -191,7 +182,7 @@ class DifyComparisonTester:
|
||||
Returns:
|
||||
str: 格式化的prompt
|
||||
"""
|
||||
return f"""请作为一个专业的答案评判专家,评估以下回答与标准答案的匹配程度。
|
||||
return f"""请作为一个电力造价行业的专家,评估以下回答与标准答案的匹配程度。
|
||||
|
||||
标准答案:
|
||||
{standard_answer}
|
||||
@@ -199,11 +190,20 @@ class DifyComparisonTester:
|
||||
待评估的回答:
|
||||
{answer_to_check}
|
||||
|
||||
请仔细分析两个答案的内容,并给出你的判断。只需要回答"正确"或"错误",不需要其他解释。
|
||||
如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。
|
||||
如果待评估的回答存在明显的错误信息或重要信息缺失,应判定为"错误"。
|
||||
|
||||
请严格按以下格式输出:【正确】或【错误】:"""
|
||||
要求
|
||||
1、分析待评估的回答与标准答案的匹配程度(包括内容、步骤、主体等)
|
||||
2、如果待评估的回答与标准答案在核心内容和关键信息(步骤)上一致,即使表达方式不同,也应判定为"正确"。
|
||||
3、只要大体描述一致,即使缺失了一些步骤,也应判定为"正确"。
|
||||
3、如果待评估的回答存在明显的错误信息,应判定为"错误"。
|
||||
4、请严格按json格式输出:
|
||||
{{
|
||||
"result": True or False,
|
||||
"reason": "简明扼要的理由(中文)"
|
||||
}}
|
||||
字段说明:
|
||||
result: True or False,待评估的回答是否正确
|
||||
reason: 简明扼要的理由(中文)
|
||||
"""
|
||||
|
||||
def judge_answer(self, standard_answer: str, answer: str) -> bool | None:
|
||||
"""
|
||||
@@ -218,10 +218,11 @@ class DifyComparisonTester:
|
||||
"""
|
||||
|
||||
prompt = self.create_correctness_prompt(standard_answer, answer)
|
||||
llm = self.get_llm()
|
||||
llm = self.get_llm(response_format={"type": "json_object"})
|
||||
try:
|
||||
response = llm.invoke(user_prompt=prompt, need_retry=True)
|
||||
return "正确" in response.content
|
||||
response_json = json.loads(response.content)
|
||||
return response_json["result"]
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
@@ -513,10 +514,10 @@ content: "{content}"
|
||||
|
||||
return old_result, new_result
|
||||
except Exception as e:
|
||||
print(f"处理问题 '{q}' 时发生错误: {str(e)}")
|
||||
logging.error(f"处理问题 '{q}' 时发生错误: {str(e)}", exc_info=True)
|
||||
return None, None
|
||||
|
||||
def process_question_with_judge(self, q:str):
|
||||
def process_question_with_judge(self, q:str, row):
|
||||
"""
|
||||
处理单个问题,获取新旧流程的回答并进行评判
|
||||
|
||||
@@ -537,7 +538,7 @@ content: "{content}"
|
||||
new_answer = future_new["新流程答案"]
|
||||
|
||||
# 获取词条链接和标准答案
|
||||
wiki_url = self.find_wiki_link(query)
|
||||
wiki_url = self.find_wiki_link(row)
|
||||
standard_answer = ""
|
||||
answer_title = ""
|
||||
|
||||
@@ -546,7 +547,7 @@ content: "{content}"
|
||||
standard_answer = self.get_wiki_content(wiki_url)
|
||||
answer_title = self.get_wiki_title(wiki_url)
|
||||
except Exception as e:
|
||||
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
|
||||
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
|
||||
|
||||
# 判断答案正确性
|
||||
judge_result = ""
|
||||
@@ -563,7 +564,7 @@ content: "{content}"
|
||||
"问题分类": future_new["新问题分类"],
|
||||
"槽点信息": future_new["槽点信息"],
|
||||
"新流程答案": new_answer,
|
||||
"回答判断": judge_result,
|
||||
"回答是否正确": judge_result,
|
||||
"答案词条": answer_title if answer_title else "",
|
||||
"检索词条": future_new["新检索词条"],
|
||||
}
|
||||
@@ -576,7 +577,7 @@ content: "{content}"
|
||||
new_answer = future_new["新流程答案"]
|
||||
|
||||
# 获取词条链接和标准答案
|
||||
wiki_url = self.find_wiki_link(query)
|
||||
wiki_url = self.find_wiki_link(row)
|
||||
standard_answer = ""
|
||||
answer_title = ""
|
||||
|
||||
@@ -585,7 +586,7 @@ content: "{content}"
|
||||
standard_answer = self.get_wiki_content(wiki_url)
|
||||
answer_title = self.get_wiki_title(wiki_url)
|
||||
except Exception as e:
|
||||
print(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}")
|
||||
logging.error(f"处理问题 '{query}' 获取标准答案时发生错误: {str(e)}", exc_info=True)
|
||||
|
||||
# 判断答案正确性
|
||||
if standard_answer:
|
||||
@@ -630,25 +631,25 @@ content: "{content}"
|
||||
if row['回答中的软件名称'] == "未知" and row['提问中的软件名称'] == "未知":
|
||||
continue
|
||||
if row['提问中的软件名称'] != "未知":
|
||||
questions.append(row['提问'])
|
||||
questions.append((row['提问'],row))
|
||||
else:
|
||||
questions.append(f"{row['回答中的软件名称']}, {row['提问']}")
|
||||
questions.append((f"{row['回答中的软件名称']}, {row['提问']}",row))
|
||||
else:
|
||||
questions.append(row['提问'])
|
||||
questions.append((row['提问'], row))
|
||||
|
||||
results = []
|
||||
is_debug = hasattr(sys, 'gettrace') and sys.gettrace() is not None
|
||||
if not is_debug:
|
||||
# 使用多线程并发处理问题
|
||||
print("并发数量: ", self.max_workers)
|
||||
print("问题数量: ", len(questions))
|
||||
logging.info(f"并发数量: {self.max_workers}")
|
||||
logging.info(f"问题数量: {len(questions)}")
|
||||
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
# 创建进度条
|
||||
with tqdm(total=len(questions), desc="处理问题进度") as pbar:
|
||||
# 提交所有任务
|
||||
futures = []
|
||||
for q in questions:
|
||||
future = executor.submit(self.process_question_with_judge, q)
|
||||
for q, row in questions:
|
||||
future = executor.submit(self.process_question_with_judge, q, row)
|
||||
futures.append(future)
|
||||
|
||||
# 处理结果
|
||||
@@ -659,9 +660,9 @@ content: "{content}"
|
||||
results.append(result)
|
||||
pbar.update(1)
|
||||
else:
|
||||
for q in questions:
|
||||
result = self.process_question_with_judge(q)
|
||||
print(json.dumps(result,ensure_ascii=False,indent=2))
|
||||
for q, row in questions:
|
||||
result = self.process_question_with_judge(q, row)
|
||||
logging.info(json.dumps(result,ensure_ascii=False,indent=2))
|
||||
if result is not None:
|
||||
results.append(result)
|
||||
|
||||
@@ -687,24 +688,29 @@ content: "{content}"
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 创建命令行参数解析器
|
||||
os.environ["DIFY_BASEURL"] = "http://10.1.16.39/v1"
|
||||
os.environ["DIFY_NEW_API_KEY"] = "app-qxsSybCs7ABiKlC1JabTYVn6"
|
||||
os.environ["DIFY_OLD_API_KEY"] = "app-wUdkWJx5zeOvmvBUZizMoSw3"
|
||||
|
||||
os.environ["DIFY_PG_HOST"] = "10.1.16.39"
|
||||
os.environ["DIFY_PG_PORT"] = "5432"
|
||||
os.environ["DIFY_PG_USER"] = "postgres"
|
||||
os.environ["DIFY_PG_PASSWORD"] = "difyai123456"
|
||||
os.environ["DIFY_PG_DATABASE"] = "dify"
|
||||
|
||||
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/历史提问数据(like)_提问明确.xlsx")
|
||||
default_wiki_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/部分提问_软件名称明确.xlsx")
|
||||
default_excel_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".." ,"data/excel/740条(dislike)_存在标准词条.xlsx")
|
||||
parser = argparse.ArgumentParser(description='Dify对话测试工具')
|
||||
parser.add_argument('--mode', type=str, choices=['new_only', 'both'], default='new_only',
|
||||
help='测试模式: new_only表示仅测试新对话, both表示测试新老对话')
|
||||
parser.add_argument('--excel_path', type=str,
|
||||
default=default_excel_path,
|
||||
help='包含问题的Excel文件路径')
|
||||
parser.add_argument('--baseurl', type=str, default="http://172.20.0.145/v1",
|
||||
parser.add_argument('--baseurl', type=str, default=os.getenv("DIFY_BASEURL"),
|
||||
help='Dify API的基础URL')
|
||||
parser.add_argument('--new_api_key', type=str, default="app-qxsSybCs7ABiKlC1JabTYVn6",
|
||||
parser.add_argument('--new_api_key', type=str, default=os.getenv("DIFY_NEW_API_KEY"),
|
||||
help='新流程的API密钥')
|
||||
parser.add_argument('--old_api_key', type=str, default="app-wUdkWJx5zeOvmvBUZizMoSw3",
|
||||
parser.add_argument('--old_api_key', type=str, default=os.getenv("DIFY_OLD_API_KEY"),
|
||||
help='旧流程的API密钥')
|
||||
parser.add_argument('--wiki_excel_path', type=str,
|
||||
default=default_wiki_excel_path,
|
||||
help='Wiki Excel文件路径,用于获取标准答案')
|
||||
parser.add_argument('--output_path', type=str, default=None,
|
||||
help='输出Excel文件路径')
|
||||
parser.add_argument('--max_workers', type=int, default=5,
|
||||
@@ -715,7 +721,7 @@ if __name__ == "__main__":
|
||||
|
||||
# 检查Excel文件是否存在
|
||||
if not os.path.exists(args.excel_path):
|
||||
print(f"错误:Excel文件不存在: {args.excel_path}")
|
||||
logging.error(f"错误:Excel文件不存在: {args.excel_path}", exc_info=True)
|
||||
exit(1)
|
||||
|
||||
# 创建测试器并运行
|
||||
@@ -724,7 +730,6 @@ if __name__ == "__main__":
|
||||
baseurl=args.baseurl,
|
||||
new_workflow_api_key=args.new_api_key,
|
||||
old_workflow_api_key=args.old_api_key if args.mode == "both" else None,
|
||||
wiki_excel_path=args.wiki_excel_path,
|
||||
output_path=args.output_path,
|
||||
max_workers=args.max_workers,
|
||||
mode=args.mode
|
||||
@@ -732,4 +737,4 @@ if __name__ == "__main__":
|
||||
|
||||
# 运行对比测试(带评判)
|
||||
output_file = tester.run_comparison(with_judge=True)
|
||||
print(f"测试结果已保存至: {output_file}")
|
||||
logging.info(f"测试结果已保存至: {output_file}")
|
||||
|
||||
+13
-12
@@ -39,11 +39,11 @@ class PgSql:
|
||||
try:
|
||||
# 连接数据库
|
||||
self.connection = psycopg2.connect(
|
||||
user="postgres",
|
||||
password="difyai123456",
|
||||
host="172.20.0.145",
|
||||
port=5432,
|
||||
database="dify"
|
||||
user=os.getenv("DIFY_PG_USER"),
|
||||
password=os.getenv("DIFY_PG_PASSWORD"),
|
||||
host=os.getenv("DIFY_PG_HOST"),
|
||||
port=os.getenv("DIFY_PG_PORT"),
|
||||
database=os.getenv("DIFY_PG_DATABASE")
|
||||
)
|
||||
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
@@ -160,11 +160,11 @@ class PgSql:
|
||||
""",
|
||||
(workflow_run_id,)
|
||||
)
|
||||
result = cursor.fetchall()
|
||||
if result:
|
||||
colnames = [desc[0] for desc in cursor.description]
|
||||
return [dict(zip(colnames, row)) for row in result]
|
||||
return None
|
||||
result = cursor.fetchall()
|
||||
if result:
|
||||
colnames = [desc[0] for desc in cursor.description]
|
||||
return [dict(zip(colnames, row)) for row in result]
|
||||
return None
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
raise Exception(f"Error while getting workflow_node_executions_info: {error}")
|
||||
|
||||
@@ -263,7 +263,8 @@ class BaseWorkflowChat:
|
||||
析构函数,在对象被销毁时自动关闭数据库连接。
|
||||
确保在对象生命周期结束时释放数据库资源。
|
||||
"""
|
||||
self.dify_tool.close_connection()
|
||||
# DifyTool类已经在其__del__方法中关闭了数据库连接,无需在此重复调用
|
||||
pass
|
||||
|
||||
def create_chat_message(self, query: str):
|
||||
"""
|
||||
@@ -464,7 +465,7 @@ class NewWorkflowChat(BaseWorkflowChat):
|
||||
elif workflow_node["title"] == "提取处理后的知识":
|
||||
outputs = json.loads(workflow_node["outputs"])["knowledge_list"]
|
||||
retrieve_title, max_score, min_score, avg_score = self.get_retrieve_info(query=query, outputs=outputs, reranker_sorce_info=reranker_sorce)
|
||||
elif workflow_node["title"] == "问题优化结果解析":
|
||||
elif workflow_node["title"] == "意图识别结果解析":
|
||||
outputs = json.loads(workflow_node["outputs"])
|
||||
rewrite_query = outputs["optimize_query"]
|
||||
llm_result_json = json.loads(workflow_node['inputs'])["llm_result"]
|
||||
|
||||
@@ -62,8 +62,13 @@ def intent_recognize():
|
||||
|
||||
# 获取单例实例并使用线程锁保护关键操作
|
||||
recognizer = RecognizerSingleton.get_instance()
|
||||
result = recognizer.process_query(query, conversation_context, chat_history, previous_slots)
|
||||
|
||||
result = recognizer.process_query(query=query,
|
||||
conversation_context=conversation_context,
|
||||
chat_history=chat_history,
|
||||
previous_slots=previous_slots,
|
||||
use_jieba=False,
|
||||
enable_query_expansion=True)
|
||||
|
||||
end_time = time.time()
|
||||
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S %z")
|
||||
logger.info(f"[{os.getpid()}] 意图识别耗时: {end_time - start_time:.2f}秒")
|
||||
|
||||
Reference in New Issue
Block a user