From ba4210799910bf2395c310fb9466f87966e31dca Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Mon, 28 Jul 2025 08:34:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DifyCompareTest=E5=92=8CWorko?= =?UTF-8?q?rderToDify=E6=A8=A1=E5=9D=97=EF=BC=8C=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=E6=A0=BC=E5=BC=8F=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E5=A4=8DAPI=E5=AF=86=E9=92=A5=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=A2=9E=E5=BC=BA=E5=B7=A5=E5=8D=95?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E6=94=B9=E8=BF=9B=E5=B9=B6=E5=8F=91=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=96=B9=E5=BC=8F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag2_0/dify/DifyCompareTest.py | 41 ++-- rag2_0/dify/WorkorderToDify.py | 277 ++++++++++++++++++++++++---- rag2_0/dify/dify_client/dify_api.py | 102 +++++++++- 3 files changed, 356 insertions(+), 64 deletions(-) diff --git a/rag2_0/dify/DifyCompareTest.py b/rag2_0/dify/DifyCompareTest.py index 78e6e66..292ccc9 100755 --- a/rag2_0/dify/DifyCompareTest.py +++ b/rag2_0/dify/DifyCompareTest.py @@ -26,7 +26,7 @@ if not os.path.exists(log_dir): os.makedirs(log_dir) # 生成带时间戳的日志文件名 -log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') +log_file = os.path.join(log_dir, f'dify_compare_{datetime.now().strftime("%Y%m%d")}.log') import logging @@ -43,7 +43,7 @@ logging.basicConfig( class DifyCompareTest: def __init__(self): # 词条与工单同时检索 - self.both_wiki_worker_client = ChatClient(api_key="app-CPoOMaGDsLRPAe9TW7Xjhszy", base_url=os.getenv("DIFY_BSAE_URL")) + self.both_wiki_worker_client = ChatClient(api_key=os.getenv("DIFY_APP_KEY"), base_url=os.getenv("DIFY_BSAE_URL")) self.llm = OpenAiLLM(base_url=os.getenv("OPENAI_API_BASE"), model=os.getenv("MODEL_NAME")) def llm_judge_answer(self, old_answer: str, now_answer: str): @@ -100,22 +100,31 @@ class DifyCompareTest: answer = result.get('answer', "") if len(answer) == 0: raise Exception(f"回答为空: {result}") - judge_result = self.llm_judge_answer(old_answer=old_answer, now_answer=answer) - return answer, judge_result + if old_answer: + judge_result = self.llm_judge_answer(old_answer=old_answer, now_answer=answer) + else: + judge_result="" + # 只取回答的前半部分 + answer = answer.split("----------------------------------------")[0].strip() + message_id = result.get('message_id', "") + return answer, judge_result, message_id except Exception as e: retry_count += 1 if retry_count >= max_retries: logging.error(f"词条与工单同时检索调用失败 (尝试 {max_retries} 次后): {e}") - return '', '' + return '', '', '' else: import time - time.sleep(1) # 等待1秒后重试 + time.sleep(10) # 等待1秒后重试 def process_single_row(self, index, row): """处理单行数据的方法""" try: query = row["提问"] - old_answer = row["回答"] + if "回答" in row: + old_answer = row["回答"] + else: + old_answer = "" current_software = row["当前软件"] inputs = { @@ -124,7 +133,7 @@ class DifyCompareTest: } # 调用词条与工单同时检索工作流 - answer, judge_result = self.process_workflow( + answer, judge_result, message_id = self.process_workflow( self.both_wiki_worker_client, inputs, query, @@ -133,17 +142,17 @@ class DifyCompareTest: # 构建结果 result_row = row.copy() - result_row["词条与工单同时回答"] = answer - result_row["词条与工单同时回答对比"] = judge_result - + result_row["message_id"] = message_id + result_row["回答"] = answer + # result_row["词条与工单同时回答对比"] = judge_result logging.info(f"成功处理第 {index + 1} 行数据") return index, result_row except Exception as e: logging.error(f"处理第 {index + 1} 行数据时出错: {e}") result_row = row.copy() - result_row["词条与工单同时回答"] = '' - result_row["词条与工单同时回答对比"] = '' + result_row["回答"] = '' + result_row["message_id"] = '' return index, result_row @@ -166,7 +175,7 @@ class DifyCompareTest: logging.info(f"成功读取Excel文件: {excel_path}, 共 {len(df)} 行数据") # 验证必要的列是否存在 - required_columns = ["提问", "回答", "当前软件"] + required_columns = ["提问", "当前软件"] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logging.error(f"Excel文件缺少必要的列: {missing_columns}") @@ -199,8 +208,6 @@ class DifyCompareTest: logging.error(f"线程执行失败 (行{original_index + 1}): {e}") # 添加失败的行 result_row = df.iloc[original_index].copy() - result_row["词条与工单同时回答"] = '线程执行失败' - result_row["词条与工单同时回答对比"] = '线程执行失败' results[original_index] = result_row pbar.update(1) @@ -223,7 +230,7 @@ if __name__ == "__main__": # 处理第一个文件 excel_files = [ # ("data/excel/5月.xlsx", "data/excel/5月问答对比.xlsx"), - ("data/excel/其他月.xlsx", "data/excel/其他月问答对比.xlsx") + ("data/excel/第四轮问题-Part2.xlsx", "data/excel/第四轮问题-Part2-问答测试.xlsx") ] for excel_path, save_path in excel_files: diff --git a/rag2_0/dify/WorkorderToDify.py b/rag2_0/dify/WorkorderToDify.py index 8b007fe..4869aa6 100644 --- a/rag2_0/dify/WorkorderToDify.py +++ b/rag2_0/dify/WorkorderToDify.py @@ -1,50 +1,247 @@ import os import sys +import datetime +import logging +import concurrent.futures +import threading + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.StreamHandler(), + logging.FileHandler(f'data/logs/WorkorderToDify_{datetime.datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8') + ] +) sys.path.append(os.getcwd()) import rag2_0.dify.dify_client.dify_api as DifyApi - import pandas as pd -pd_data = pd.read_excel("data/excel/工单汇总(给AI)_工单拆分.xlsx") -dify_api = DifyApi.DifyApi() -peiwang_dataset_id = dify_api.get_or_create_dataset_by_name("配网工单数据") -zhuwang_dataset_id = dify_api.get_or_create_dataset_by_name("主网工单数据") -jianga_dataset_id = dify_api.get_or_create_dataset_by_name("技改工单数据") -chuneng_dataset_id = dify_api.get_or_create_dataset_by_name("储能工单数据") -xizang_dataset_id = dify_api.get_or_create_dataset_by_name("西藏工单数据") - - -soft_segments_list={} -for index, row in pd_data.iterrows(): - query = row["客户问题"] - answer = row["解决方案"] - skill_group = row["技能组"] +class WorkorderToDify: + def __init__(self, excel_path="data/excel/2025.1-6月工单(人工整理后).xlsx"): + self.pd_data = pd.read_excel(excel_path) + self.dify_api = DifyApi.DifyApi() + self.dataset_ids = {} + self.skill_group_data = {} + self.metadata_ids = {} # 用于缓存元数据ID + + # 初始化各技能组的数据集ID + self.dataset_ids["博微配网计价通D3"] = self.dify_api.get_or_create_dataset_by_name("配网工单数据") + self.dataset_ids["博微电力建设计价通软件"] = self.dify_api.get_or_create_dataset_by_name("主网工单数据") + self.dataset_ids["博微技改检修计价通T1软件"] = self.dify_api.get_or_create_dataset_by_name("技改工单数据") + self.dataset_ids["新能源系列"] = self.dify_api.get_or_create_dataset_by_name("储能工单数据") + self.dataset_ids["博微西藏计价通Z1"] = self.dify_api.get_or_create_dataset_by_name("西藏工单数据") + self.dataset_ids["通用"] = self.dify_api.get_or_create_dataset_by_name("通用工单数据") - content = f"问题:{query}\n回答:{answer}" - if skill_group not in soft_segments_list: - soft_segments_list[skill_group]=[] - soft_segments_list[skill_group].append({ - "content": str(content), - "answer": "", - "keywords": [] - }) + def check_and_create_metadata(self): + """检查并创建workorder_time元数据,同时缓存元数据ID""" + for skill_group, dataset_id in self.dataset_ids.items(): + # 获取当前数据集的所有元数据 + metadata_info = self.dify_api.get_dataset_metadata(dataset_id) + metadata_list = metadata_info['doc_metadata'] + + # 查找workorder_time元数据 + workorder_time_id = None + has_workorder_time = False + + for metadata in metadata_list: + if metadata.get("name") == "workorder_time": + has_workorder_time = True + workorder_time_id = metadata.get("id") + break + + # 如果不存在,则创建 + if not has_workorder_time: + metadata = self.dify_api.create_dataset_metadata(dataset_id, "string", "workorder_time") + if metadata and "id" in metadata: + workorder_time_id = metadata["id"] + + # 缓存元数据ID + if workorder_time_id: + self.metadata_ids[skill_group] = {"workorder_time": workorder_time_id} + else: + logging.error(f"无法获取或创建 {skill_group} 的 workorder_time 元数据ID") + + def classify_workorders(self): + """按技能组分类工单""" + logging.info("开始按技能组分类工单") + total_count = len(self.pd_data) + processed_count = 0 + error_count = 0 + + for index, row in self.pd_data.iterrows(): + try: + query = row["客户问题"] + answer = row["解决方案"] + skill_group = row["产品线"] + if skill_group=="" or pd.isna(skill_group): + skill_group="通用" + create_time = row["创建时间"] # 2025-07-22 15:00:35 + if isinstance(create_time, str): + try: + # 尝试原始格式 %Y-%m-%d %H:%M:%S + create_time = datetime.datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S") + except ValueError: + try: + # 尝试格式 %Y/%m/%d %H:%M + create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d %H:%M") + except ValueError: + # 如果仍然失败,记录错误并尝试其他可能的格式 + try: + # 尝试格式 %Y/%m/%d + create_time = datetime.datetime.strptime(create_time, "%Y/%m/%d") + except ValueError: + raise ValueError(f"创建时间格式错误: {create_time}") -for skill_group, segments_list in soft_segments_list.items(): - if skill_group == "配网": - dataset_id = peiwang_dataset_id - elif skill_group == "主网": - dataset_id = zhuwang_dataset_id - elif skill_group == "技改": - dataset_id = jianga_dataset_id - elif skill_group == "储能": - dataset_id = chuneng_dataset_id - elif skill_group == "西藏": - dataset_id = xizang_dataset_id - else: - continue - document_id = dify_api.get_document_id(dataset_id=dataset_id, document_name=f"{skill_group}工单数据") - if not document_id: - document_id = dify_api.upload_text_to_document(text_name=f"{skill_group}工单数据", text="", dataset_id=dataset_id) - dify_api.add_document_segments(dataset_id=dataset_id, document_id=document_id, segments_list=segments_list) \ No newline at end of file + if not isinstance(create_time, datetime.datetime): + raise ValueError(f"创建时间格式错误: {create_time}") + conversation_id = row["会话id"] + + content = f"问题:{query}\n回答:{answer}" + + if skill_group not in self.skill_group_data: + self.skill_group_data[skill_group] = [] + + self.skill_group_data[skill_group].append({ + "document_name": query, + "content": content, + "create_time": create_time, + "conversation_id": conversation_id + }) + + processed_count += 1 + if processed_count % 100 == 0: + logging.info(f"已处理 {processed_count}/{total_count} 条工单") + + except Exception as e: + error_count += 1 + logging.error(f"处理第 {index} 行工单时出错: {str(e)}") + logging.error(f"错误工单内容: {row.to_dict()}") + continue + + logging.info(f"工单分类完成,共处理 {processed_count} 条,错误 {error_count} 条") + for skill_group, data in self.skill_group_data.items(): + logging.info(f"技能组 {skill_group}: {len(data)} 条工单") + + def deduplicate_workorders(self): + """对每个技能组内的工单进行去重,保留时间最新的""" + logging.info("开始对工单进行去重处理") + for skill_group in self.skill_group_data: + logging.info(f"处理技能组: {skill_group}, 去重前工单数量: {len(self.skill_group_data[skill_group])}") + # 创建一个临时字典,用于存储每个客户问题的最新工单 + latest_workorders = {} + + for workorder in self.skill_group_data[skill_group]: + query = workorder["document_name"] + create_time = workorder["create_time"] + + # 如果该问题尚未在字典中或当前工单的时间比已有的更新 + if query not in latest_workorders or create_time > latest_workorders[query]["datetime"]: + latest_workorders[query] = { + "workorder": workorder, + "datetime": create_time + } + logging.debug(f"更新工单: {query}, 时间: {create_time}") + + # 用去重后的工单列表替换原列表 + self.skill_group_data[skill_group] = [item["workorder"] for item in latest_workorders.values()] + logging.info(f"技能组 {skill_group} 去重完成, 去重后工单数量: {len(self.skill_group_data[skill_group])}") + + logging.info("所有技能组工单去重处理完成") + + def upload_workorders(self): + """上传每个技能组的工单作为独立文档""" + logging.info("开始上传工单文档") + total_docs = sum(len(docs) for docs in self.skill_group_data.values()) + processed_count = 0 + error_count = 0 + + # 创建线程锁,用于保护计数器更新 + lock = threading.Lock() + + # 创建一个线程池 + max_workers = min(20, os.cpu_count() * 5) # 最多20个线程,或者CPU核心数的5倍 + logging.info(f"创建线程池,最大线程数: {max_workers}") + + def upload_document(args): + skill_group, doc, dataset_id, workorder_time_id = args + nonlocal processed_count, error_count + + try: + document_id = self.dify_api.get_document_id(dataset_id=dataset_id, document_name=doc["document_name"]) + if document_id: + # 如果文档已存在,先删除 + self.dify_api.del_document_by_id(dataset_id=dataset_id, document_id=document_id) + + # 上传文档 + document_id = self.dify_api.upload_text_to_document( + text_name=doc["document_name"], + text=doc["content"], + dataset_id=dataset_id + ) + create_time_str = doc["create_time"].strftime("%Y-%m-%d %H:%M") + # 上传成功后,添加创建时间作为元数据 + if document_id: + metadata_list = [ + { + "id": workorder_time_id, + "name": "workorder_time", + "value": create_time_str + } + ] + self.dify_api.add_document_metadata(dataset_id, document_id, metadata_list) + + with lock: + processed_count += 1 + if processed_count % 10 == 0 or processed_count == total_docs: + logging.info(f"已上传 {processed_count}/{total_docs} 个文档") + + return True + except Exception as e: + with lock: + error_count += 1 + logging.error(f"上传文档 '{doc['document_name']}' 失败: {str(e)}") + return False + + # 准备上传任务列表 + upload_tasks = [] + for skill_group, documents in self.skill_group_data.items(): + if skill_group not in self.dataset_ids: + logging.warning(f"技能组 '{skill_group}' 没有对应的数据集ID,跳过上传") + continue + + dataset_id = self.dataset_ids[skill_group] + + # 检查是否有缓存的元数据ID + if skill_group not in self.metadata_ids or "workorder_time" not in self.metadata_ids[skill_group]: + logging.error(f"未找到 {skill_group} 的 workorder_time 元数据ID,跳过上传") + continue + + workorder_time_id = self.metadata_ids[skill_group]["workorder_time"] + + # 为每个工单创建独立文档 + for doc in documents: + upload_tasks.append((skill_group, doc, dataset_id, workorder_time_id)) + + # 使用线程池并发执行上传任务 + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(executor.map(upload_document, upload_tasks)) + + success_count = sum(1 for result in results if result) + logging.info(f"工单上传完成,总计: {total_docs},成功: {success_count},失败: {error_count}") + + def process(self): + """执行完整的工单处理流程""" + self.check_and_create_metadata() + self.classify_workorders() + self.deduplicate_workorders() + self.upload_workorders() + + +if __name__ == "__main__": + processor = WorkorderToDify() + processor.process() \ No newline at end of file diff --git a/rag2_0/dify/dify_client/dify_api.py b/rag2_0/dify/dify_client/dify_api.py index 0b2c021..ca74114 100644 --- a/rag2_0/dify/dify_client/dify_api.py +++ b/rag2_0/dify/dify_client/dify_api.py @@ -105,7 +105,7 @@ class DifyApi: while idx < 5: if self.get_document_indexing_status(dataset_id, response.json().get("batch")): break - time.sleep(1) + time.sleep(5) idx += 1 else: logging.warning("文档索引超时,可能需要手动检查。") @@ -221,11 +221,101 @@ class DifyApi: # 获取所有文档 doc_info = self.get_documents(dataset_id, keyword=document_name) for doc_id, info in doc_info.items(): - if info["name"].split('.')[0] == document_name: + if info["name"] == document_name: return doc_id - print(f'获取文档ID失败。名称: {document_name}。原因:未找到文档ID') return '' + + def add_document_metadata(self, dataset_id: str, document_id: str, metadata_list: List[Dict]) -> bool: + """ + 为文档添加元数据。 + + :param dataset_id: 数据集ID。 + :param document_id: 文档ID。 + :param metadata_list: 元数据列表,每项包含id、name和value。 + :return: 如果添加成功返回True,否则返回False。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents/metadata" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + data = { + "operation_data": [ + { + "document_id": document_id, + "metadata_list": metadata_list + } + ] + } + + try: + response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) + if response.status_code == 200: + logging.info(f"成功为文档 {document_id} 添加元数据") + return True + else: + logging.error(f"添加元数据失败,状态码: {response.status_code}, 响应: {response.text}") + return False + except Exception as e: + logging.error(f"添加元数据请求失败: {e}") + return False + + def get_dataset_metadata(self, dataset_id: str) -> List[Dict]: + """ + 获取数据集的元数据。 + + :param dataset_id: 数据集ID。 + :return: 元数据列表,如果获取失败则返回空列表。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/metadata" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}' + } + + try: + response = requests.get(url, headers=headers, verify=False) + if response.status_code == 200: + return response.json() + else: + logging.error(f"获取数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}") + return [] + except Exception as e: + logging.error(f"获取数据集元数据请求失败: {e}") + return [] + + def create_dataset_metadata(self, dataset_id: str, metadata_type: str, metadata_name: str) -> Dict: + """ + 创建数据集元数据。 + + :param dataset_id: 数据集ID。 + :param metadata_type: 元数据类型,如"string"。 + :param metadata_name: 元数据名称。 + :return: 创建的元数据信息,如果创建失败则返回空字典。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/metadata" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + data = { + "type": metadata_type, + "name": metadata_name + } + + try: + response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) + if response.json()["id"]: + logging.info(f"成功创建数据集元数据: {metadata_name}") + return response.json() + else: + logging.error(f"创建数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}") + return {} + except Exception as e: + logging.error(f"创建数据集元数据请求失败: {e}") + return {} def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str: """ @@ -638,9 +728,7 @@ if __name__ == '__main__': load_dotenv() d = DifyApi() - id = d.upload_file(r"D:\Code\DataConvertUpload\wiki3todify\images\5fd27f31858f808f7659165628bfb8a7.png") - print(id) - - # d.remove_dataset_all_doc("0b835829-4d47-4419-832f-3cd6d9510b87") + + # d.remove_dataset_all_doc("8673162d-0db1-4752-905e-ae3ef377a541") # d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690") # d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1")