From b8ccdcf287c7caf4b56cad14688d801430dbac78 Mon Sep 17 00:00:00 2001 From: ouyangyouzhang Date: Wed, 2 Jul 2025 15:53:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EDifyApi=E7=B1=BB=E4=BB=A5?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=B8=8EDify=20API=E7=9A=84=E4=BA=A4?= =?UTF-8?q?=E4=BA=92=EF=BC=8C=E5=8C=85=E5=90=AB=E6=96=87=E6=A1=A3=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0=E3=80=81=E7=B4=A2=E5=BC=95=E7=8A=B6=E6=80=81=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E3=80=81=E6=95=B0=E6=8D=AE=E9=9B=86=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E7=AD=89=E5=8A=9F=E8=83=BD=E3=80=82=E5=90=8C=E6=97=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96DifyCompareTest.py=E4=B8=AD=E7=9A=84=E7=BC=BA=E5=A4=B1?= =?UTF-8?q?=E6=A7=BD=E4=BD=8D=E5=88=A4=E6=96=AD=E9=80=BB=E8=BE=91=EF=BC=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag2_0/dify/DifyCompareTest.py | 5 +- rag2_0/dify/dify_client/dify_api.py | 479 ++++++++++++++++++++++++++++ rag2_0/tool/WikijsTool.py | 2 +- 3 files changed, 482 insertions(+), 4 deletions(-) create mode 100644 rag2_0/dify/dify_client/dify_api.py diff --git a/rag2_0/dify/DifyCompareTest.py b/rag2_0/dify/DifyCompareTest.py index 9823ed1..fcf625d 100755 --- a/rag2_0/dify/DifyCompareTest.py +++ b/rag2_0/dify/DifyCompareTest.py @@ -572,9 +572,8 @@ content: "{content}" slot_info_data = json.loads(slot_info) else: slot_info_data = slot_info - slot_missing = slot_info_data.get("slot_missing", None) - slot_missing_str = "完整" if not slot_missing else "缺失" - + slot_missing = slot_info_data.get("missing_slots", {}) + slot_missing_str = "完整" if len(slot_missing) == 0 else "缺失" # 返回结果 return { "问题": query, diff --git a/rag2_0/dify/dify_client/dify_api.py b/rag2_0/dify/dify_client/dify_api.py new file mode 100644 index 0000000..a9aaa88 --- /dev/null +++ b/rag2_0/dify/dify_client/dify_api.py @@ -0,0 +1,479 @@ +import json +import logging +import os +import time +import mimetypes + +import requests +from typing import Dict, List +import urllib3 +# 禁用 SSL 警告 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +class DifyApi: + """ + 用于与Dify API进行交互的类。 + """ + + def __init__(self, dify_url: str="http://10.1.16.39/v1", + dify_dataset_api_key: str="dataset-skLjmPVonjHo119OWNf3kAmY", + dify_app_api_key: str="app-wUdkWJx5zeOvmvBUZizMoSw3"): + self.dify_url = dify_url + self.dify_dataset_api_key = dify_dataset_api_key + self.dify_app_api_key = dify_app_api_key + + def get_document_indexing_status(self, datasets_id: str, batch: str) -> bool: + """ + 获取文档的索引状态。 + + :param datasets_id: 数据集ID。 + :param batch: 批次ID。 + :return: 索引状态是否完成。 + """ + url = f'{self.dify_url}/datasets/{datasets_id}/documents/{batch}/indexing-status' + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}' + } + + try: + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + return response.json().get("data", [{}])[0].get("indexing_status") == "completed" + except Exception as e: + logging.error(f"获取索引状态失败: {e}") + return False + + def upload_text_to_document(self, text_name: str, text: str, dataset_id: str, enable_subchunk:bool=False) -> str: + """ + 上传文本作为文档到Dify API并返回文档ID。 + + :param text_name: 文本文档的名称。 + :param text: 文本文档的内容。 + :param dataset_id: 数据集ID。 + :return: 创建的文档ID。 + :raises Exception: 如果文档创建失败。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/document/create_by_text" + headers = { + "Authorization": f"Bearer {self.dify_dataset_api_key}", + "Content-Type": "application/json" + } + if enable_subchunk: + doc_form='hierarchical_model' + mode='hierarchical' + else: + doc_form='text_model' + mode='custom' + process_rule = { + "rules": { + "pre_processing_rules": [ + {"id": "remove_extra_spaces", "enabled": True}, + {"id": "remove_urls_emails", "enabled": False} + ], + "segmentation": { + "separator": "==================================", + "max_tokens": 1999, + "chunk_overlap": 300 + }, + "parent_mode":"paragraph", + "subchunk_segmentation": { + "separator": "\\n", + "max_tokens": 100, + "chunk_overlap": 20 + }, + }, + "mode": mode + } + # 启用父子级模式 doc_form==hierarchical_model and mode==hierarchical + # 不启用父子级模式 doc_form==text_model and mode==custom + data = { + "name": text_name, + "text": text, + "indexing_technique": "high_quality", + "doc_form": doc_form, + "process_rule": process_rule, + "doc_language": "Chinese", + } + + try: + response = requests.post(url=url, headers=headers, data=json.dumps(data), verify=False) + response.raise_for_status() + except Exception as e: + logging.error(f"上传文档失败: {e}") + raise + + idx = 0 + while idx < 5: + if self.get_document_indexing_status(dataset_id, response.json().get("batch")): + break + time.sleep(1) + idx += 1 + else: + logging.warning("文档索引超时,可能需要手动检查。") + + return response.json().get("document", {}).get("id", "") + + def get_or_create_dataset_by_name(self, dataset_name: str) -> str: + """ + 通过名称获取或创建数据集。 + + :param dataset_name: 数据集名称。 + :return: 数据集ID。 + """ + list_dataset = self.get_all_dataset_list() + for dataset in list_dataset: + if dataset["name"] == dataset_name: + return dataset["id"] + logging.info(f"数据集不存在,创建数据集: {dataset_name}") + return self.create_dataset(dataset_name) + + def get_documents(self, dataset_id: str, keyword: str = None) -> Dict[str, dict]: + """ + 获取指定数据集的所有文档。 + + :param dataset_id: 数据集ID。 + :param keyword: 关键词,用于过滤文档。 + :return: 文档字典,键为文档ID,值为文档信息。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents" + headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"} + limit = 100 + page = 0 + all_document = {} + while True: + page += 1 + params = { + 'keyword': keyword, + 'page': page, + 'limit': limit + } + + response = requests.get(url, headers=headers, params=params, verify=False) + + if response.status_code == 200: + data = response.json() + parsed_data = {item["id"]: item for item in data['data']} + all_document.update(parsed_data) + if not data["has_more"]: + break + else: + print(f"获取文档失败:{response.status_code},{response.text}") + break + + return all_document + + def remove_dataset_all_doc(self, dataset_id: str): + """ + 删除数据集中所有的文档。 + + :param dataset_id: 数据集ID。 + :raises: Exception 如果文档删除失败。 + """ + list_doc_id = self.get_documents(dataset_id=dataset_id) + + delete_url = f"{self.dify_url}/datasets/{dataset_id}/documents" + + headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"} + # 删除每个文档 + for doc_id in list_doc_id: + response = requests.delete(url=f"{delete_url}/{doc_id}", headers=headers, verify=False) + if response.status_code != 200: + raise Exception(f"删除失败:{doc_id},状态码:{response.status_code},响应内容:{response.text}") + + print(f"已删除文档ID列表: {list_doc_id}") + + def create_dataset(self, dataset_name: str) -> str: + """ + 创建数据集。 + + :param dataset_name: 数据集名称。 + :return: 创建的数据集ID。 + :raises: Exception 如果数据集创建失败。 + """ + url = f'{self.dify_url}/datasets' + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + data = {'name': dataset_name, + "indexing_technique": "high_quality", + "permission": "all_team_members",} + + response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) + + if response.status_code == 200: + return response.json()['id'] + else: + raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}") + + def get_document_id(self, dataset_id: str, document_name: str) -> str: + """ + 获取指定名称的文档ID。 + + :param dataset_id: 数据集ID。 + :param document_name: 文档名称。 + :return: 文档ID,如果未找到则返回空字符串。 + """ + # 获取所有文档 + doc_info = self.get_documents(dataset_id, keyword=document_name) + for doc_id, info in doc_info.items(): + if info["name"].split('.')[0] == document_name: + return doc_id + + print(f'获取文档ID失败。名称: {document_name}。原因:未找到文档ID') + return '' + + def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str: + """ + 获取指定文档的最后更新时间。 + + :param dataset_id: 数据集ID。 + :param document_name: 文档名称。 + :return: 最后更新时间字符串。 + """ + doc_dict = self.get_documents(dataset_id, keyword=document_name) + + if len(doc_dict) != 1: + print(f'获取失败,{doc_dict}') + return '' + val = list(doc_dict.values()) + return val[0]['created_at'] + + def del_document_by_name(self, dataset_id: str, document_name: str) -> bool: + """ + 通过文档名称删除文档。 + + :param dataset_id: 数据集ID。 + :param document_name: 文档名称。 + :return: 如果删除成功返回True,否则返回False。 + """ + document_id = self.get_document_id(dataset_id, document_name) + if document_id == '': + return False + + return self.del_document_by_id(dataset_id, document_id) + + def del_document_by_id(self, dataset_id: str, document_id: str) -> bool: + """ + 删除指定ID的文档。 + + :param dataset_id: 数据集ID。 + :param document_id: 文档ID。 + :return: 如果删除成功返回True,否则返回False。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}' + } + response = requests.delete(url, headers=headers, verify=False) + if response.status_code == 200: + print("Document deleted successfully.") + return True + else: + print(f"Failed to delete document. Status code: {response.status_code}, Response: {response.text}") + return False + + def get_all_dataset_list(self) -> List[dict]: + """ + 获取所有数据集列表。 + + :return: 数据集列表。 + :raises: Exception 如果请求失败。 + """ + url = f'{self.dify_url}/datasets' + params = { + 'page': 1, + 'limit': 100 + } + + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}' + } + + response = requests.get(url, headers=headers, params=params, verify=False) + + if response.status_code == 200: + return response.json()['data'] + else: + raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}") + + def add_document_segments(self, dataset_id: str, document_id: str, segments_list: List[dict]) -> bool: + """ + 向文档添加段落。每次最多上传50条数据。 + + :param dataset_id: 数据集ID。 + :param document_id: 文档ID。 + :param segments_list: 段落列表。 + :return: 如果所有段落添加成功则返回True,否则返回False。 + :raises: Exception 如果请求失败。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + # 将segments_list按每50条数据分组 + batch_size = 50 + success = True + + for i in range(0, len(segments_list), batch_size): + batch = segments_list[i:i + batch_size] + segments = [ + { + "content": str(seg['content']), + "answer": str(seg['answer']), + "keywords": seg['keywords'] + } + for seg in batch + ] + + data = {'segments': segments} + + try: + response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) + if response.status_code == 200: + response_data = response.json()['data'] + if response_data[0]['status'] == 'error': + logging.error(f"分段上传失败: {response_data[0]['error']}") + success = False + else: + raise Exception(f"添加段落失败。状态码:{response.status_code},响应:{response.text}") + + # 添加短暂延时,避免请求过于频繁 + time.sleep(0.5) + + except Exception as e: + logging.error(f"上传段落批次 {i//batch_size + 1} 失败: {e}") + success = False + + return success + + def get_document_segments(self, dataset_id: str, document_id: str) -> List[Dict]: + """ + 获取指定文档的分段信息。 + + :param dataset_id: 数据集ID。 + :param document_id: 文档ID。 + :return: 分段信息列表。 + :raises: Exception 如果请求失败。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + try: + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() # 如果响应状态码不是200,抛出异常 + return response.json().get("data", []) # 返回分段信息列表 + except Exception as e: + logging.error(f"获取文档分段失败: {e}") + raise + + def update_document_segment( + self, + dataset_id: str, + document_id: str, + segment_id: str, + content: str, + answer: str, + keywords: List[str], + enabled: bool + ) -> Dict: + """ + 更新指定文档的某个分段信息。 + + :param dataset_id: 数据集ID。 + :param document_id: 文档ID。 + :param segment_id: 分段ID。 + :param content: 分段内容。 + :param answer: 分段答案。 + :param keywords: 分段关键词列表。 + :param enabled: 是否启用分段。 + :return: 更新后的分段信息。 + :raises: Exception 如果请求失败。 + """ + url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}" + headers = { + 'Authorization': f'Bearer {self.dify_dataset_api_key}', + 'Content-Type': 'application/json' + } + + # 构造请求数据 + data = { + "segment": { + "content": content, + "answer": answer, + "keywords": keywords, + "enabled": enabled, + "regenerate_child_chunks": True + } + } + + try: + response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) + response.raise_for_status() # 如果响应状态码不是200,抛出异常 + if response.json()["data"]['error']: + logging.error(f"更新文档分段失败: {response.json()['data']['error']}") + raise + return response.json().get("data", {}) # 返回更新后的分段信息 + except Exception as e: + logging.error(f"更新文档分段失败: {e}") + raise + + def upload_file(self, file_path: str, max_retries: int = 3) -> str: + url = f"{self.dify_url}/files/upload" + headers = { + 'Authorization': f'Bearer {self.dify_app_api_key}' + } + + # 获取文件的MIME类型 + mime_type = mimetypes.guess_type(file_path)[0] + if not mime_type: + raise Exception(f"无法确定文件类型: {file_path}") + + # 读取文件内容 + with open(file_path, 'rb') as file: + file_content = file.read() + + for attempt in range(max_retries): + try: + files = { + 'file': ( + os.path.basename(file_path), + file_content, + mime_type + ) + } + response = requests.post(url, headers=headers, files=files, verify=False) + if response.ok: + logging.info(f"上传文件成功,文件: {file_path}") + return response.json()['id'] + else: + error_msg = f"上传文件失败,状态码: {response.status_code}, 响应: {response.text}" + if attempt == max_retries - 1: # 最后一次尝试 + raise Exception(error_msg) + logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}") + time.sleep(1) # 重试前等待1秒 + except Exception as e: + if attempt == max_retries - 1: # 最后一次尝试 + logging.error(f"上传文件失败: {e}") + raise Exception(f"上传文件失败: {e}") + logging.warning(f"第{attempt + 1}次尝试失败: {e}") + time.sleep(1) # 重试前等待1秒 + + +if __name__ == '__main__': + from dotenv import load_dotenv + + load_dotenv() + + d = DifyApi() + id = d.upload_file(r"D:\Code\DataConvertUpload\wiki3todify\images\5fd27f31858f808f7659165628bfb8a7.png") + print(id) + + # d.remove_dataset_all_doc("0b835829-4d47-4419-832f-3cd6d9510b87") + # d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690") + # d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1") diff --git a/rag2_0/tool/WikijsTool.py b/rag2_0/tool/WikijsTool.py index 72193e5..e217680 100755 --- a/rag2_0/tool/WikijsTool.py +++ b/rag2_0/tool/WikijsTool.py @@ -148,7 +148,7 @@ class WikijsTool: response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data) if "errors" in response.text: result = json.loads(response.content)['errors'][0]['message'] - return {} + raise ValueError(f"查询文档失败,原因:“{result}") else: return json.loads(response.content)['data']['pages']['single']