Files
QueryRewrite/rag2_0/dify/WorkorderToDify.py
T
ouyangyouzhang 53ac47f4a5 1、调整规费相关问题的分类
2、意图识别增加清单、定额字段
2025-08-21 17:51:55 +08:00

272 lines
12 KiB
Python

import os
import sys
import datetime
import logging
import concurrent.futures
import threading
from dotenv import load_dotenv
load_dotenv()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(),
logging.FileHandler(f'data/logs/WorkorderToDify_{datetime.datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8')
]
)
sys.path.append(os.getcwd())
import rag2_0.dify.dify_client.dify_api as DifyApi
import pandas as pd
class WorkorderToDify:
def __init__(self, excel_path="data/excel/2025.1-6月工单(已补充标签)-人工删减.xlsx"):
self.pd_data = pd.read_excel(excel_path)
self.dify_api = DifyApi.DifyApi()
self.dataset_ids = {}
self.skill_group_data = {}
self.metadata_ids = {} # 用于缓存元数据ID
# 初始化各技能组的数据集ID
self.dataset_ids["博微配网计价通D3"] = self.dify_api.get_or_create_dataset_by_name("配网工单数据")
self.dataset_ids["博微电力建设计价通软件"] = self.dify_api.get_or_create_dataset_by_name("主网工单数据")
self.dataset_ids["博微技改检修计价通T1软件"] = self.dify_api.get_or_create_dataset_by_name("技改工单数据")
self.dataset_ids["新能源系列"] = self.dify_api.get_or_create_dataset_by_name("储能工单数据")
self.dataset_ids["博微西藏计价通Z1"] = self.dify_api.get_or_create_dataset_by_name("西藏工单数据")
self.dataset_ids["通用"] = self.dify_api.get_or_create_dataset_by_name("通用工单数据")
def check_and_create_metadata(self):
"""检查并创建workorder_time元数据,同时缓存元数据ID"""
for skill_group, dataset_id in self.dataset_ids.items():
# 获取当前数据集的所有元数据
metadata_info = self.dify_api.get_dataset_metadata(dataset_id)
metadata_list = metadata_info['doc_metadata']
# 查找workorder_time元数据
workorder_time_id = None
has_workorder_time = False
for metadata in metadata_list:
if metadata.get("name") == "workorder_time":
has_workorder_time = True
workorder_time_id = metadata.get("id")
break
# 如果不存在,则创建
if not has_workorder_time:
metadata = self.dify_api.create_dataset_metadata(dataset_id, "string", "workorder_time")
if metadata and "id" in metadata:
workorder_time_id = metadata["id"]
# 缓存元数据ID
if workorder_time_id:
self.metadata_ids[skill_group] = {"workorder_time": workorder_time_id}
else:
logging.error(f"无法获取或创建 {skill_group} 的 workorder_time 元数据ID")
def classify_workorders(self):
"""按技能组分类工单"""
logging.info("开始按技能组分类工单")
total_count = len(self.pd_data)
processed_count = 0
error_count = 0
for index, row in self.pd_data.iterrows():
try:
query = row["客户问题"]
answer = row["解决方案"]
skill_group = row["产品线"]
if skill_group=="" or pd.isna(skill_group):
skill_group="通用"
create_time = row["创建时间"] # 2025-07-22 15:00:35
if isinstance(create_time, str):
try:
# 尝试原始格式 %Y-%m-%d %H:%M:%S
create_time = datetime.datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
# 尝试格式 %Y/%m/%d %H:%M
create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d %H:%M")
except ValueError:
# 如果仍然失败,记录错误并尝试其他可能的格式
try:
# 尝试格式 %Y/%m/%d
create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d")
except ValueError:
raise ValueError(f"创建时间格式错误: {create_time}")
if not isinstance(create_time, datetime.datetime):
raise ValueError(f"创建时间格式错误: {create_time}")
conversation_id = row["会话id"]
content = f"问题:{query}\n回答:{answer}"
if skill_group not in self.skill_group_data:
self.skill_group_data[skill_group] = []
self.skill_group_data[skill_group].append({
"document_name": query,
"content": content,
"create_time": create_time,
"conversation_id": conversation_id
})
processed_count += 1
if processed_count % 100 == 0:
logging.info(f"已处理 {processed_count}/{total_count} 条工单")
except Exception as e:
error_count += 1
logging.error(f"处理第 {index} 行工单时出错: {str(e)}")
logging.error(f"错误工单内容: {row.to_dict()}")
continue
logging.info(f"工单分类完成,共处理 {processed_count} 条,错误 {error_count} 条")
for skill_group, data in self.skill_group_data.items():
logging.info(f"技能组 {skill_group}: {len(data)} 条工单")
def deduplicate_workorders(self):
"""对每个技能组内的工单进行去重,将问题相同的工单内容合并"""
logging.info("开始对工单进行去重处理")
for skill_group in self.skill_group_data:
logging.info(f"处理技能组: {skill_group}, 去重前工单数量: {len(self.skill_group_data[skill_group])}")
# 创建一个临时字典,用于存储每个客户问题的合并工单
merged_workorders = {}
for workorder in self.skill_group_data[skill_group]:
query = workorder["document_name"]
create_time = workorder["create_time"]
content = workorder["content"]
# 如果该问题尚未在字典中,则添加
if query not in merged_workorders:
merged_workorders[query] = {
"workorder": workorder,
"datetime": create_time,
"contents": [content]
}
else:
# 如果问题已存在,添加内容并更新时间(如果当前时间更新)
merged_workorders[query]["contents"].append(content)
if create_time > merged_workorders[query]["datetime"]:
merged_workorders[query]["datetime"] = create_time
# 更新会话ID等其他信息,但保留合并后的内容
merged_workorders[query]["workorder"].update({
"create_time": create_time,
"conversation_id": workorder["conversation_id"]
})
# 合并内容并用去重后的工单列表替换原列表
result_workorders = []
for query, data in merged_workorders.items():
workorder = data["workorder"].copy()
merged_content = "\n\n---\n\n".join(data["contents"])
workorder["content"] = merged_content
result_workorders.append(workorder)
self.skill_group_data[skill_group] = result_workorders
logging.info(f"技能组 {skill_group} 去重完成, 去重后工单数量: {len(self.skill_group_data[skill_group])}")
logging.info("所有技能组工单去重处理完成")
def del_all_document(self):
"""删除所有文档"""
for skill_group, dataset_id in self.dataset_ids.items():
self.dify_api.remove_dataset_all_doc(dataset_id)
def upload_workorders(self):
"""上传每个技能组的工单作为独立文档"""
logging.info("开始上传工单文档")
total_docs = sum(len(docs) for docs in self.skill_group_data.values())
processed_count = 0
error_count = 0
# 创建线程锁,用于保护计数器更新
lock = threading.Lock()
# 创建一个线程池
max_workers = min(20, os.cpu_count() * 2) # 最多20个线程,或者CPU核心数的5倍
logging.info(f"创建线程池,最大线程数: {max_workers}")
def upload_document(args):
skill_group, doc, dataset_id, workorder_time_id = args
nonlocal processed_count, error_count
try:
document_id = self.dify_api.get_document_id(dataset_id=dataset_id, document_name=doc["document_name"])
if document_id:
# 如果文档已存在,先删除
self.dify_api.del_document_by_id(dataset_id=dataset_id, document_id=document_id)
# 上传文档
document_id = self.dify_api.upload_text_to_document(
text_name=doc["document_name"],
text=doc["content"],
dataset_id=dataset_id
)
create_time_str = doc["create_time"].strftime("%Y-%m-%d %H:%M")
# 上传成功后,添加创建时间作为元数据
if document_id:
metadata_list = [
{
"id": workorder_time_id,
"name": "workorder_time",
"value": create_time_str
}
]
self.dify_api.add_document_metadata(dataset_id, document_id, metadata_list)
with lock:
processed_count += 1
if processed_count % 10 == 0 or processed_count == total_docs:
logging.info(f"已上传 {processed_count}/{total_docs} 个文档")
return True
except Exception as e:
with lock:
error_count += 1
logging.error(f"上传文档 '{doc['document_name']}' 失败: {str(e)}")
return False
# 准备上传任务列表
upload_tasks = []
for skill_group, documents in self.skill_group_data.items():
if skill_group not in self.dataset_ids:
logging.warning(f"技能组 '{skill_group}' 没有对应的数据集ID,跳过上传")
continue
dataset_id = self.dataset_ids[skill_group]
# 检查是否有缓存的元数据ID
if skill_group not in self.metadata_ids or "workorder_time" not in self.metadata_ids[skill_group]:
logging.error(f"未找到 {skill_group} 的 workorder_time 元数据ID,跳过上传")
continue
workorder_time_id = self.metadata_ids[skill_group]["workorder_time"]
# 为每个工单创建独立文档
for doc in documents:
upload_tasks.append((skill_group, doc, dataset_id, workorder_time_id))
# 使用线程池并发执行上传任务
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(upload_document, upload_tasks))
success_count = sum(1 for result in results if result)
logging.info(f"工单上传完成,总计: {total_docs},成功: {success_count},失败: {error_count}")
def process(self):
"""执行完整的工单处理流程"""
self.check_and_create_metadata()
self.classify_workorders()
self.deduplicate_workorders()
# self.del_all_document()
self.upload_workorders()
if __name__ == "__main__":
processor = WorkorderToDify()
processor.process()