import json import logging import os import time import mimetypes import requests from typing import Dict, List import urllib3 # 禁用 SSL 警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class DifyApi: """ 用于与Dify API进行交互的类。 """ def __init__(self, dify_url: str="http://10.1.16.39/v1", dify_dataset_api_key: str="dataset-skLjmPVonjHo119OWNf3kAmY", dify_app_api_key: str="app-wUdkWJx5zeOvmvBUZizMoSw3"): self.dify_url = dify_url self.dify_dataset_api_key = dify_dataset_api_key self.dify_app_api_key = dify_app_api_key def get_document_indexing_status(self, datasets_id: str, batch: str) -> bool: """ 获取文档的索引状态。 :param datasets_id: 数据集ID。 :param batch: 批次ID。 :return: 索引状态是否完成。 """ url = f'{self.dify_url}/datasets/{datasets_id}/documents/{batch}/indexing-status' headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}' } try: response = requests.get(url, headers=headers, verify=False) response.raise_for_status() return response.json().get("data", [{}])[0].get("indexing_status") == "completed" except Exception as e: logging.error(f"获取索引状态失败: {e}") return False def upload_text_to_document(self, text_name: str, text: str, dataset_id: str, enable_subchunk:bool=False) -> str: """ 上传文本作为文档到Dify API并返回文档ID。 :param text_name: 文本文档的名称。 :param text: 文本文档的内容。 :param dataset_id: 数据集ID。 :return: 创建的文档ID。 :raises Exception: 如果文档创建失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/document/create_by_text" headers = { "Authorization": f"Bearer {self.dify_dataset_api_key}", "Content-Type": "application/json" } if enable_subchunk: doc_form='hierarchical_model' mode='hierarchical' else: doc_form='text_model' mode='custom' process_rule = { "rules": { "pre_processing_rules": [ {"id": "remove_extra_spaces", "enabled": True}, {"id": "remove_urls_emails", "enabled": False} ], "segmentation": { "separator": "==================================", "max_tokens": 1999, "chunk_overlap": 300 }, "parent_mode":"paragraph", "subchunk_segmentation": { "separator": "\\n", "max_tokens": 100, "chunk_overlap": 20 }, }, "mode": mode } # 启用父子级模式 doc_form==hierarchical_model and mode==hierarchical # 不启用父子级模式 doc_form==text_model and mode==custom data = { "name": text_name, "text": text, "indexing_technique": "high_quality", "doc_form": doc_form, "process_rule": process_rule, "doc_language": "Chinese", } try: response = requests.post(url=url, headers=headers, data=json.dumps(data), verify=False) response.raise_for_status() except Exception as e: logging.error(f"上传文档失败: {e}") raise idx = 0 while idx < 5: if self.get_document_indexing_status(dataset_id, response.json().get("batch")): break time.sleep(1) idx += 1 else: logging.warning("文档索引超时,可能需要手动检查。") return response.json().get("document", {}).get("id", "") def get_or_create_dataset_by_name(self, dataset_name: str, create_if_not_exist: bool=True) -> str: """ 通过名称获取或创建数据集。 :param dataset_name: 数据集名称。 :param create_if_not_exist: 如果数据集不存在是否创建。 :return: 数据集ID。 """ list_dataset = self.get_all_dataset_list() for dataset in list_dataset: if dataset["name"] == dataset_name: return dataset["id"] if create_if_not_exist: logging.info(f"数据集不存在,创建数据集: {dataset_name}") return self.create_dataset(dataset_name) else: raise Exception(f"数据集不存在: {dataset_name}") def get_documents(self, dataset_id: str, keyword: str = None) -> Dict[str, dict]: """ 获取指定数据集的所有文档。 :param dataset_id: 数据集ID。 :param keyword: 关键词,用于过滤文档。 :return: 文档字典,键为文档ID,值为文档信息。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents" headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"} limit = 100 page = 0 all_document = {} while True: page += 1 params = { 'keyword': keyword, 'page': page, 'limit': limit } response = requests.get(url, headers=headers, params=params, verify=False) if response.status_code == 200: data = response.json() parsed_data = {item["id"]: item for item in data['data']} all_document.update(parsed_data) if not data["has_more"]: break else: print(f"获取文档失败:{response.status_code},{response.text}") break return all_document def remove_dataset_all_doc(self, dataset_id: str): """ 删除数据集中所有的文档。 :param dataset_id: 数据集ID。 :raises: Exception 如果文档删除失败。 """ list_doc_id = self.get_documents(dataset_id=dataset_id) delete_url = f"{self.dify_url}/datasets/{dataset_id}/documents" headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"} # 删除每个文档 for doc_id in list_doc_id: response = requests.delete(url=f"{delete_url}/{doc_id}", headers=headers, verify=False) if response.status_code != 200: raise Exception(f"删除失败:{doc_id},状态码:{response.status_code},响应内容:{response.text}") print(f"已删除文档ID列表: {list_doc_id}") def create_dataset(self, dataset_name: str) -> str: """ 创建数据集。 :param dataset_name: 数据集名称。 :return: 创建的数据集ID。 :raises: Exception 如果数据集创建失败。 """ url = f'{self.dify_url}/datasets' headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } data = {'name': dataset_name, "indexing_technique": "high_quality", "permission": "all_team_members",} response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) if response.status_code == 200: return response.json()['id'] else: raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}") def get_document_id(self, dataset_id: str, document_name: str) -> str: """ 获取指定名称的文档ID。 :param dataset_id: 数据集ID。 :param document_name: 文档名称。 :return: 文档ID,如果未找到则返回空字符串。 """ # 获取所有文档 doc_info = self.get_documents(dataset_id, keyword=document_name) for doc_id, info in doc_info.items(): if info["name"].split('.')[0] == document_name: return doc_id print(f'获取文档ID失败。名称: {document_name}。原因:未找到文档ID') return '' def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str: """ 获取指定文档的最后更新时间。 :param dataset_id: 数据集ID。 :param document_name: 文档名称。 :return: 最后更新时间字符串。 """ doc_dict = self.get_documents(dataset_id, keyword=document_name) if len(doc_dict) != 1: print(f'获取失败,{doc_dict}') return '' val = list(doc_dict.values()) return val[0]['created_at'] def del_document_by_name(self, dataset_id: str, document_name: str) -> bool: """ 通过文档名称删除文档。 :param dataset_id: 数据集ID。 :param document_name: 文档名称。 :return: 如果删除成功返回True,否则返回False。 """ document_id = self.get_document_id(dataset_id, document_name) if document_id == '': return False return self.del_document_by_id(dataset_id, document_id) def del_document_by_id(self, dataset_id: str, document_id: str) -> bool: """ 删除指定ID的文档。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :return: 如果删除成功返回True,否则返回False。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}' } response = requests.delete(url, headers=headers, verify=False) if response.status_code == 200: print("Document deleted successfully.") return True else: print(f"Failed to delete document. Status code: {response.status_code}, Response: {response.text}") return False def get_all_dataset_list(self) -> List[dict]: """ 获取所有数据集列表。 :return: 数据集列表。 :raises: Exception 如果请求失败。 """ url = f'{self.dify_url}/datasets' params = { 'page': 1, 'limit': 100 } headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}' } response = requests.get(url, headers=headers, params=params, verify=False) if response.status_code == 200: return response.json()['data'] else: raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}") def add_document_segments(self, dataset_id: str, document_id: str, segments_list: List[dict]) -> bool: """ 向文档添加段落。每次最多上传50条数据。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segments_list: 段落列表。 :return: 如果所有段落添加成功则返回True,否则返回False。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } # 将segments_list按每50条数据分组 batch_size = 50 success = True for i in range(0, len(segments_list), batch_size): batch = segments_list[i:i + batch_size] segments = [ { "content": str(seg['content']), "answer": str(seg['answer']), "keywords": seg['keywords'] } for seg in batch ] data = {'segments': segments} try: response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) if response.status_code == 200: response_data = response.json()['data'] if response_data[0]['status'] == 'error': logging.error(f"分段上传失败: {response_data[0]['error']}") success = False else: raise Exception(f"添加段落失败。状态码:{response.status_code},响应:{response.text}") # 添加短暂延时,避免请求过于频繁 time.sleep(0.5) except Exception as e: logging.error(f"上传段落批次 {i//batch_size + 1} 失败: {e}") success = False return success def get_document_segments(self, dataset_id: str, document_id: str) -> List[Dict]: """ 获取指定文档的分段信息。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :return: 分段信息列表。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } limit = 100 page = 0 all_segments = [] try: while True: page += 1 params = { 'page': page, 'limit': limit } response = requests.get(url, headers=headers, params=params, verify=False) response.raise_for_status() # 如果响应状态码不是200,抛出异常 data = response.json() all_segments.extend(data.get("data", [])) if not data.get("has_more", False): break return all_segments except Exception as e: logging.error(f"获取文档分段失败: {e}") raise def update_document_segment( self, dataset_id: str, document_id: str, segment_id: str, content: str, answer: str, keywords: List[str], enabled: bool, regenerate_child_chunks: bool = True ) -> Dict: """ 更新指定文档的某个分段信息。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segment_id: 分段ID。 :param content: 分段内容。 :param answer: 分段答案。 :param keywords: 分段关键词列表。 :param enabled: 是否启用分段。 :return: 更新后的分段信息。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } # 构造请求数据 data = { "segment": { "content": content, "answer": answer, "keywords": keywords, "enabled": enabled, "regenerate_child_chunks": regenerate_child_chunks } } try: response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) response.raise_for_status() # 如果响应状态码不是200,抛出异常 if response.json()["data"]['error']: logging.error(f"更新文档分段失败: {response.json()['data']['error']}") raise return response.json().get("data", {}) # 返回更新后的分段信息 except Exception as e: logging.error(f"更新文档分段失败: {e}") raise def upload_file(self, file_path: str, max_retries: int = 3) -> str: url = f"{self.dify_url}/files/upload" headers = { 'Authorization': f'Bearer {self.dify_app_api_key}' } # 获取文件的MIME类型 mime_type = mimetypes.guess_type(file_path)[0] if not mime_type: raise Exception(f"无法确定文件类型: {file_path}") # 读取文件内容 with open(file_path, 'rb') as file: file_content = file.read() for attempt in range(max_retries): try: files = { 'file': ( os.path.basename(file_path), file_content, mime_type ) } response = requests.post(url, headers=headers, files=files, verify=False) if response.ok: logging.info(f"上传文件成功,文件: {file_path}") return response.json()['id'] else: error_msg = f"上传文件失败,状态码: {response.status_code}, 响应: {response.text}" if attempt == max_retries - 1: # 最后一次尝试 raise Exception(error_msg) logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}") time.sleep(1) # 重试前等待1秒 except Exception as e: if attempt == max_retries - 1: # 最后一次尝试 logging.error(f"上传文件失败: {e}") raise Exception(f"上传文件失败: {e}") logging.warning(f"第{attempt + 1}次尝试失败: {e}") time.sleep(1) # 重试前等待1秒 def add_document_child_chunk( self, dataset_id: str, document_id: str, segment_id: str, content: str ) -> Dict: """ 新增文档子分段。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segment_id: 分段ID。 :param content: 子分段内容。 :return: 新增的子分段信息。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } # 构造请求数据 data = { "content": content } try: response = requests.post(url, headers=headers, data=json.dumps(data), verify=False) response.raise_for_status() # 如果响应状态码不是200,抛出异常 return response.json().get("data", {}) # 返回新增的子分段信息 except Exception as e: logging.error(f"新增文档子分段失败: {e}") raise def get_document_child_chunks( self, dataset_id: str, document_id: str, segment_id: str, page: int = 1, limit: int = 100 ) -> Dict: """ 获取文档子分段列表。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segment_id: 分段ID。 :param page: 页码,默认为1。 :param limit: 每页数量,默认为20。 :return: 子分段列表信息,包含分页信息。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}' } params = { 'page': page, 'limit': limit } try: response = requests.get(url, headers=headers, params=params, verify=False) response.raise_for_status() # 如果响应状态码不是200,抛出异常 return response.json() # 返回子分段列表信息,包含分页信息 except Exception as e: logging.error(f"获取文档子分段列表失败: {e}") raise def del_document_child_chunk( self, dataset_id: str, document_id: str, segment_id: str, child_chunk_id: str ) -> bool: """ 删除文档子分段。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segment_id: 分段ID。 :param child_chunk_id: 子分段ID。 :return: 如果删除成功返回True,否则返回False。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks/{child_chunk_id}" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}' } try: response = requests.delete(url, headers=headers, verify=False) if response.status_code == 200: logging.info(f"删除子分段成功: {child_chunk_id}") return True else: logging.error(f"删除子分段失败,状态码: {response.status_code}, 响应: {response.text}") return False except Exception as e: logging.error(f"删除子分段失败: {e}") return False def update_document_child_chunk( self, dataset_id: str, document_id: str, segment_id: str, child_chunk_id: str, content: str ) -> Dict: """ 更新文档子分段内容。 :param dataset_id: 数据集ID。 :param document_id: 文档ID。 :param segment_id: 分段ID。 :param child_chunk_id: 子分段ID。 :param content: 更新的子分段内容。 :return: 更新后的子分段信息。 :raises: Exception 如果请求失败。 """ url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks/{child_chunk_id}" headers = { 'Authorization': f'Bearer {self.dify_dataset_api_key}', 'Content-Type': 'application/json' } # 构造请求数据 data = { "content": content } try: response = requests.patch(url, headers=headers, data=json.dumps(data), verify=False) response.raise_for_status() # 如果响应状态码不是200,抛出异常 return response.json().get("data", {}) # 返回更新后的子分段信息 except Exception as e: logging.error(f"更新文档子分段失败: {e}") raise if __name__ == '__main__': from dotenv import load_dotenv load_dotenv() d = DifyApi() id = d.upload_file(r"D:\Code\DataConvertUpload\wiki3todify\images\5fd27f31858f808f7659165628bfb8a7.png") print(id) # d.remove_dataset_all_doc("0b835829-4d47-4419-832f-3cd6d9510b87") # d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690") # d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1")