优化DifyCompareTest和WorkorderToDify模块,调整日志记录格式,修复API密钥获取方式,增强工单处理流程,添加元数据管理功能,改进并发上传逻辑,更新文档处理方式。

This commit is contained in:
2025-07-28 08:34:17 +08:00
parent 780f423200
commit ba42107999
3 changed files with 356 additions and 64 deletions
+22 -15
View File
@@ -26,7 +26,7 @@ if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成带时间戳的日志文件名
log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d")}.log')
import logging
@@ -43,7 +43,7 @@ logging.basicConfig(
class DifyCompareTest:
def __init__(self):
# 词条与工单同时检索
self.both_wiki_worker_client = ChatClient(api_key="app-CPoOMaGDsLRPAe9TW7Xjhszy", base_url=os.getenv("DIFY_BSAE_URL"))
self.both_wiki_worker_client = ChatClient(api_key=os.getenv("DIFY_APP_KEY"), base_url=os.getenv("DIFY_BSAE_URL"))
self.llm = OpenAiLLM(base_url=os.getenv("OPENAI_API_BASE"), model=os.getenv("MODEL_NAME"))
def llm_judge_answer(self, old_answer: str, now_answer: str):
@@ -100,22 +100,31 @@ class DifyCompareTest:
answer = result.get('answer', "")
if len(answer) == 0:
raise Exception(f"回答为空: {result}")
if old_answer:
judge_result = self.llm_judge_answer(old_answer=old_answer, now_answer=answer)
return answer, judge_result
else:
judge_result=""
# 只取回答的前半部分
answer = answer.split("----------------------------------------")[0].strip()
message_id = result.get('message_id', "")
return answer, judge_result, message_id
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
logging.error(f"词条与工单同时检索调用失败 (尝试 {max_retries} 次后): {e}")
return '', ''
return '', '', ''
else:
import time
time.sleep(1) # 等待1秒后重试
time.sleep(10) # 等待1秒后重试
def process_single_row(self, index, row):
"""处理单行数据的方法"""
try:
query = row["提问"]
if "回答" in row:
old_answer = row["回答"]
else:
old_answer = ""
current_software = row["当前软件"]
inputs = {
@@ -124,7 +133,7 @@ class DifyCompareTest:
}
# 调用词条与工单同时检索工作流
answer, judge_result = self.process_workflow(
answer, judge_result, message_id = self.process_workflow(
self.both_wiki_worker_client,
inputs,
query,
@@ -133,17 +142,17 @@ class DifyCompareTest:
# 构建结果
result_row = row.copy()
result_row["词条与工单同时回答"] = answer
result_row["词条与工单同时回答对比"] = judge_result
result_row["message_id"] = message_id
result_row["回答"] = answer
# result_row["词条与工单同时回答对比"] = judge_result
logging.info(f"成功处理第 {index + 1} 行数据")
return index, result_row
except Exception as e:
logging.error(f"处理第 {index + 1} 行数据时出错: {e}")
result_row = row.copy()
result_row["词条与工单同时回答"] = ''
result_row["词条与工单同时回答对比"] = ''
result_row["回答"] = ''
result_row["message_id"] = ''
return index, result_row
@@ -166,7 +175,7 @@ class DifyCompareTest:
logging.info(f"成功读取Excel文件: {excel_path}, 共 {len(df)} 行数据")
# 验证必要的列是否存在
required_columns = ["提问", "回答", "当前软件"]
required_columns = ["提问", "当前软件"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logging.error(f"Excel文件缺少必要的列: {missing_columns}")
@@ -199,8 +208,6 @@ class DifyCompareTest:
logging.error(f"线程执行失败 (行{original_index + 1}): {e}")
# 添加失败的行
result_row = df.iloc[original_index].copy()
result_row["词条与工单同时回答"] = '线程执行失败'
result_row["词条与工单同时回答对比"] = '线程执行失败'
results[original_index] = result_row
pbar.update(1)
@@ -223,7 +230,7 @@ if __name__ == "__main__":
# 处理第一个文件
excel_files = [
# ("data/excel/5月.xlsx", "data/excel/5月问答对比.xlsx"),
("data/excel/其他月.xlsx", "data/excel/其他月问答对比.xlsx")
("data/excel/第四轮问题-Part2.xlsx", "data/excel/第四轮问题-Part2-问答测试.xlsx")
]
for excel_path, save_path in excel_files:
+230 -33
View File
@@ -1,50 +1,247 @@
import os
import sys
import datetime
import logging
import concurrent.futures
import threading
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(),
logging.FileHandler(f'data/logs/WorkorderToDify_{datetime.datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8')
]
)
sys.path.append(os.getcwd())
import rag2_0.dify.dify_client.dify_api as DifyApi
import pandas as pd
pd_data = pd.read_excel("data/excel/工单汇总(给AI)_工单拆分.xlsx")
dify_api = DifyApi.DifyApi()
peiwang_dataset_id = dify_api.get_or_create_dataset_by_name("配网工单数据")
zhuwang_dataset_id = dify_api.get_or_create_dataset_by_name("主网工单数据")
jianga_dataset_id = dify_api.get_or_create_dataset_by_name("技改工单数据")
chuneng_dataset_id = dify_api.get_or_create_dataset_by_name("储能工单数据")
xizang_dataset_id = dify_api.get_or_create_dataset_by_name("西藏工单数据")
class WorkorderToDify:
def __init__(self, excel_path="data/excel/2025.1-6月工单(人工整理后).xlsx"):
self.pd_data = pd.read_excel(excel_path)
self.dify_api = DifyApi.DifyApi()
self.dataset_ids = {}
self.skill_group_data = {}
self.metadata_ids = {} # 用于缓存元数据ID
# 初始化各技能组的数据集ID
self.dataset_ids["博微配网计价通D3"] = self.dify_api.get_or_create_dataset_by_name("配网工单数据")
self.dataset_ids["博微电力建设计价通软件"] = self.dify_api.get_or_create_dataset_by_name("主网工单数据")
self.dataset_ids["博微技改检修计价通T1软件"] = self.dify_api.get_or_create_dataset_by_name("技改工单数据")
self.dataset_ids["新能源系列"] = self.dify_api.get_or_create_dataset_by_name("储能工单数据")
self.dataset_ids["博微西藏计价通Z1"] = self.dify_api.get_or_create_dataset_by_name("西藏工单数据")
self.dataset_ids["通用"] = self.dify_api.get_or_create_dataset_by_name("通用工单数据")
soft_segments_list={}
for index, row in pd_data.iterrows():
def check_and_create_metadata(self):
"""检查并创建workorder_time元数据,同时缓存元数据ID"""
for skill_group, dataset_id in self.dataset_ids.items():
# 获取当前数据集的所有元数据
metadata_info = self.dify_api.get_dataset_metadata(dataset_id)
metadata_list = metadata_info['doc_metadata']
# 查找workorder_time元数据
workorder_time_id = None
has_workorder_time = False
for metadata in metadata_list:
if metadata.get("name") == "workorder_time":
has_workorder_time = True
workorder_time_id = metadata.get("id")
break
# 如果不存在,则创建
if not has_workorder_time:
metadata = self.dify_api.create_dataset_metadata(dataset_id, "string", "workorder_time")
if metadata and "id" in metadata:
workorder_time_id = metadata["id"]
# 缓存元数据ID
if workorder_time_id:
self.metadata_ids[skill_group] = {"workorder_time": workorder_time_id}
else:
logging.error(f"无法获取或创建 {skill_group} 的 workorder_time 元数据ID")
def classify_workorders(self):
"""按技能组分类工单"""
logging.info("开始按技能组分类工单")
total_count = len(self.pd_data)
processed_count = 0
error_count = 0
for index, row in self.pd_data.iterrows():
try:
query = row["客户问题"]
answer = row["解决方案"]
skill_group = row["技能组"]
skill_group = row["产品线"]
if skill_group=="" or pd.isna(skill_group):
skill_group="通用"
create_time = row["创建时间"] # 2025-07-22 15:00:35
if isinstance(create_time, str):
try:
# 尝试原始格式 %Y-%m-%d %H:%M:%S
create_time = datetime.datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
# 尝试格式 %Y/%m/%d %H:%M
create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d %H:%M")
except ValueError:
# 如果仍然失败,记录错误并尝试其他可能的格式
try:
# 尝试格式 %Y/%m/%d
create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d")
except ValueError:
raise ValueError(f"创建时间格式错误: {create_time}")
if not isinstance(create_time, datetime.datetime):
raise ValueError(f"创建时间格式错误: {create_time}")
conversation_id = row["会话id"]
content = f"问题:{query}\n回答:{answer}"
if skill_group not in soft_segments_list:
soft_segments_list[skill_group]=[]
soft_segments_list[skill_group].append({
"content": str(content),
"answer": "",
"keywords": []
if skill_group not in self.skill_group_data:
self.skill_group_data[skill_group] = []
self.skill_group_data[skill_group].append({
"document_name": query,
"content": content,
"create_time": create_time,
"conversation_id": conversation_id
})
for skill_group, segments_list in soft_segments_list.items():
if skill_group == "配网":
dataset_id = peiwang_dataset_id
elif skill_group == "主网":
dataset_id = zhuwang_dataset_id
elif skill_group == "技改":
dataset_id = jianga_dataset_id
elif skill_group == "储能":
dataset_id = chuneng_dataset_id
elif skill_group == "西藏":
dataset_id = xizang_dataset_id
else:
processed_count += 1
if processed_count % 100 == 0:
logging.info(f"已处理 {processed_count}/{total_count} 条工单")
except Exception as e:
error_count += 1
logging.error(f"处理第 {index} 行工单时出错: {str(e)}")
logging.error(f"错误工单内容: {row.to_dict()}")
continue
document_id = dify_api.get_document_id(dataset_id=dataset_id, document_name=f"{skill_group}工单数据")
if not document_id:
document_id = dify_api.upload_text_to_document(text_name=f"{skill_group}工单数据", text="", dataset_id=dataset_id)
dify_api.add_document_segments(dataset_id=dataset_id, document_id=document_id, segments_list=segments_list)
logging.info(f"工单分类完成,共处理 {processed_count} 条,错误 {error_count}")
for skill_group, data in self.skill_group_data.items():
logging.info(f"技能组 {skill_group}: {len(data)} 条工单")
def deduplicate_workorders(self):
"""对每个技能组内的工单进行去重,保留时间最新的"""
logging.info("开始对工单进行去重处理")
for skill_group in self.skill_group_data:
logging.info(f"处理技能组: {skill_group}, 去重前工单数量: {len(self.skill_group_data[skill_group])}")
# 创建一个临时字典,用于存储每个客户问题的最新工单
latest_workorders = {}
for workorder in self.skill_group_data[skill_group]:
query = workorder["document_name"]
create_time = workorder["create_time"]
# 如果该问题尚未在字典中或当前工单的时间比已有的更新
if query not in latest_workorders or create_time > latest_workorders[query]["datetime"]:
latest_workorders[query] = {
"workorder": workorder,
"datetime": create_time
}
logging.debug(f"更新工单: {query}, 时间: {create_time}")
# 用去重后的工单列表替换原列表
self.skill_group_data[skill_group] = [item["workorder"] for item in latest_workorders.values()]
logging.info(f"技能组 {skill_group} 去重完成, 去重后工单数量: {len(self.skill_group_data[skill_group])}")
logging.info("所有技能组工单去重处理完成")
def upload_workorders(self):
"""上传每个技能组的工单作为独立文档"""
logging.info("开始上传工单文档")
total_docs = sum(len(docs) for docs in self.skill_group_data.values())
processed_count = 0
error_count = 0
# 创建线程锁,用于保护计数器更新
lock = threading.Lock()
# 创建一个线程池
max_workers = min(20, os.cpu_count() * 5) # 最多20个线程,或者CPU核心数的5倍
logging.info(f"创建线程池,最大线程数: {max_workers}")
def upload_document(args):
skill_group, doc, dataset_id, workorder_time_id = args
nonlocal processed_count, error_count
try:
document_id = self.dify_api.get_document_id(dataset_id=dataset_id, document_name=doc["document_name"])
if document_id:
# 如果文档已存在,先删除
self.dify_api.del_document_by_id(dataset_id=dataset_id, document_id=document_id)
# 上传文档
document_id = self.dify_api.upload_text_to_document(
text_name=doc["document_name"],
text=doc["content"],
dataset_id=dataset_id
)
create_time_str = doc["create_time"].strftime("%Y-%m-%d %H:%M")
# 上传成功后,添加创建时间作为元数据
if document_id:
metadata_list = [
{
"id": workorder_time_id,
"name": "workorder_time",
"value": create_time_str
}
]
self.dify_api.add_document_metadata(dataset_id, document_id, metadata_list)
with lock:
processed_count += 1
if processed_count % 10 == 0 or processed_count == total_docs:
logging.info(f"已上传 {processed_count}/{total_docs} 个文档")
return True
except Exception as e:
with lock:
error_count += 1
logging.error(f"上传文档 '{doc['document_name']}' 失败: {str(e)}")
return False
# 准备上传任务列表
upload_tasks = []
for skill_group, documents in self.skill_group_data.items():
if skill_group not in self.dataset_ids:
logging.warning(f"技能组 '{skill_group}' 没有对应的数据集ID,跳过上传")
continue
dataset_id = self.dataset_ids[skill_group]
# 检查是否有缓存的元数据ID
if skill_group not in self.metadata_ids or "workorder_time" not in self.metadata_ids[skill_group]:
logging.error(f"未找到 {skill_group} 的 workorder_time 元数据ID,跳过上传")
continue
workorder_time_id = self.metadata_ids[skill_group]["workorder_time"]
# 为每个工单创建独立文档
for doc in documents:
upload_tasks.append((skill_group, doc, dataset_id, workorder_time_id))
# 使用线程池并发执行上传任务
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(upload_document, upload_tasks))
success_count = sum(1 for result in results if result)
logging.info(f"工单上传完成,总计: {total_docs},成功: {success_count},失败: {error_count}")
def process(self):
"""执行完整的工单处理流程"""
self.check_and_create_metadata()
self.classify_workorders()
self.deduplicate_workorders()
self.upload_workorders()
if __name__ == "__main__":
processor = WorkorderToDify()
processor.process()
+94 -6
View File
@@ -105,7 +105,7 @@ class DifyApi:
while idx < 5:
if self.get_document_indexing_status(dataset_id, response.json().get("batch")):
break
time.sleep(1)
time.sleep(5)
idx += 1
else:
logging.warning("文档索引超时,可能需要手动检查。")
@@ -221,12 +221,102 @@ class DifyApi:
# 获取所有文档
doc_info = self.get_documents(dataset_id, keyword=document_name)
for doc_id, info in doc_info.items():
if info["name"].split('.')[0] == document_name:
if info["name"] == document_name:
return doc_id
print(f'获取文档ID失败。名称: {document_name}。原因:未找到文档ID')
return ''
def add_document_metadata(self, dataset_id: str, document_id: str, metadata_list: List[Dict]) -> bool:
"""
为文档添加元数据。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param metadata_list: 元数据列表,每项包含id、name和value。
:return: 如果添加成功返回True,否则返回False。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {
"operation_data": [
{
"document_id": document_id,
"metadata_list": metadata_list
}
]
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
logging.info(f"成功为文档 {document_id} 添加元数据")
return True
else:
logging.error(f"添加元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return False
except Exception as e:
logging.error(f"添加元数据请求失败: {e}")
return False
def get_dataset_metadata(self, dataset_id: str) -> List[Dict]:
"""
获取数据集的元数据。
:param dataset_id: 数据集ID。
:return: 元数据列表,如果获取失败则返回空列表。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
try:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
return response.json()
else:
logging.error(f"获取数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return []
except Exception as e:
logging.error(f"获取数据集元数据请求失败: {e}")
return []
def create_dataset_metadata(self, dataset_id: str, metadata_type: str, metadata_name: str) -> Dict:
"""
创建数据集元数据。
:param dataset_id: 数据集ID。
:param metadata_type: 元数据类型,如"string"
:param metadata_name: 元数据名称。
:return: 创建的元数据信息,如果创建失败则返回空字典。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {
"type": metadata_type,
"name": metadata_name
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.json()["id"]:
logging.info(f"成功创建数据集元数据: {metadata_name}")
return response.json()
else:
logging.error(f"创建数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return {}
except Exception as e:
logging.error(f"创建数据集元数据请求失败: {e}")
return {}
def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str:
"""
获取指定文档的最后更新时间。
@@ -638,9 +728,7 @@ if __name__ == '__main__':
load_dotenv()
d = DifyApi()
id = d.upload_file(r"D:\Code\DataConvertUpload\wiki3todify\images\5fd27f31858f808f7659165628bfb8a7.png")
print(id)
# d.remove_dataset_all_doc("0b835829-4d47-4419-832f-3cd6d9510b87")
# d.remove_dataset_all_doc("8673162d-0db1-4752-905e-ae3ef377a541")
# d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690")
# d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1")