Files
QueryRewrite/rag2_0/dify/dify_client/dify_api.py
T

735 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import os
import time
import mimetypes
import requests
from typing import Dict, List
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DifyApi:
"""
用于与Dify API进行交互的类。
"""
def __init__(self, dify_url: str=None,
dify_dataset_api_key: str=None,
dify_app_api_key: str=None):
self.dify_url = dify_url if dify_url else os.environ.get('DIFY_BSAE_URL')
self.dify_dataset_api_key = dify_dataset_api_key if dify_dataset_api_key else os.environ.get('DIFY_DATASET_KEY')
self.dify_app_api_key = dify_app_api_key if dify_app_api_key else os.environ.get('DIFY_APP_KEY')
def get_document_indexing_status(self, datasets_id: str, batch: str) -> bool:
"""
获取文档的索引状态。
:param datasets_id: 数据集ID。
:param batch: 批次ID。
:return: 索引状态是否完成。
"""
url = f'{self.dify_url}/datasets/{datasets_id}/documents/{batch}/indexing-status'
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
try:
response = requests.get(url, headers=headers, verify=False)
response.raise_for_status()
return response.json().get("data", [{}])[0].get("indexing_status") == "completed"
except Exception as e:
logging.error(f"获取索引状态失败: {e}")
return False
def upload_text_to_document(self, text_name: str, text: str, dataset_id: str, enable_subchunk:bool=False) -> str:
"""
上传文本作为文档到Dify API并返回文档ID。
:param text_name: 文本文档的名称。
:param text: 文本文档的内容。
:param dataset_id: 数据集ID。
:return: 创建的文档ID。
:raises Exception: 如果文档创建失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/document/create_by_text"
headers = {
"Authorization": f"Bearer {self.dify_dataset_api_key}",
"Content-Type": "application/json"
}
if enable_subchunk:
doc_form='hierarchical_model'
mode='hierarchical'
else:
doc_form='text_model'
mode='custom'
process_rule = {
"rules": {
"pre_processing_rules": [
{"id": "remove_extra_spaces", "enabled": True},
{"id": "remove_urls_emails", "enabled": False}
],
"segmentation": {
"separator": "==================================",
"max_tokens": 1999,
"chunk_overlap": 300
},
"parent_mode":"paragraph",
"subchunk_segmentation": {
"separator": "\\n",
"max_tokens": 100,
"chunk_overlap": 20
},
},
"mode": mode
}
# 启用父子级模式 doc_form==hierarchical_model and mode==hierarchical
# 不启用父子级模式 doc_form==text_model and mode==custom
data = {
"name": text_name,
"text": text,
"indexing_technique": "high_quality",
"doc_form": doc_form,
"process_rule": process_rule,
"doc_language": "Chinese",
}
try:
response = requests.post(url=url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status()
except Exception as e:
logging.error(f"上传文档失败: {e}")
raise
idx = 0
while idx < 5:
if self.get_document_indexing_status(dataset_id, response.json().get("batch")):
break
time.sleep(5)
idx += 1
else:
logging.warning("文档索引超时,可能需要手动检查。")
return response.json().get("document", {}).get("id", "")
def get_or_create_dataset_by_name(self, dataset_name: str, create_if_not_exist: bool=True) -> str:
"""
通过名称获取或创建数据集。
:param dataset_name: 数据集名称。
:param create_if_not_exist: 如果数据集不存在是否创建。
:return: 数据集ID。
"""
list_dataset = self.get_all_dataset_list()
for dataset in list_dataset:
if dataset["name"] == dataset_name:
return dataset["id"]
if create_if_not_exist:
logging.info(f"数据集不存在,创建数据集: {dataset_name}")
return self.create_dataset(dataset_name)
else:
raise Exception(f"数据集不存在: {dataset_name}")
def get_documents(self, dataset_id: str, keyword: str = None) -> Dict[str, dict]:
"""
获取指定数据集的所有文档。
:param dataset_id: 数据集ID。
:param keyword: 关键词,用于过滤文档。
:return: 文档字典,键为文档ID,值为文档信息。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents"
headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"}
limit = 100
page = 0
all_document = {}
while True:
page += 1
params = {
'keyword': keyword,
'page': page,
'limit': limit
}
response = requests.get(url, headers=headers, params=params, verify=False)
if response.status_code == 200:
data = response.json()
parsed_data = {item["id"]: item for item in data['data']}
all_document.update(parsed_data)
if not data["has_more"]:
break
else:
print(f"获取文档失败:{response.status_code}{response.text}")
break
return all_document
def remove_dataset_all_doc(self, dataset_id: str):
"""
删除数据集中所有的文档。
:param dataset_id: 数据集ID。
:raises: Exception 如果文档删除失败。
"""
list_doc_id = self.get_documents(dataset_id=dataset_id)
delete_url = f"{self.dify_url}/datasets/{dataset_id}/documents"
headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"}
# 删除每个文档
for doc_id in list_doc_id:
response = requests.delete(url=f"{delete_url}/{doc_id}", headers=headers, verify=False)
if response.status_code != 200:
raise Exception(f"删除失败:{doc_id},状态码:{response.status_code},响应内容:{response.text}")
print(f"已删除文档ID列表: {list_doc_id}")
def create_dataset(self, dataset_name: str) -> str:
"""
创建数据集。
:param dataset_name: 数据集名称。
:return: 创建的数据集ID。
:raises: Exception 如果数据集创建失败。
"""
url = f'{self.dify_url}/datasets'
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {'name': dataset_name,
"indexing_technique": "high_quality",
"permission": "all_team_members",}
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
return response.json()['id']
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
def get_document_id(self, dataset_id: str, document_name: str) -> str:
"""
获取指定名称的文档ID。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 文档ID,如果未找到则返回空字符串。
"""
# 获取所有文档
doc_info = self.get_documents(dataset_id, keyword=document_name)
for doc_id, info in doc_info.items():
if info["name"] == document_name:
return doc_id
return ''
def add_document_metadata(self, dataset_id: str, document_id: str, metadata_list: List[Dict]) -> bool:
"""
为文档添加元数据。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param metadata_list: 元数据列表,每项包含id、name和value。
:return: 如果添加成功返回True,否则返回False。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {
"operation_data": [
{
"document_id": document_id,
"metadata_list": metadata_list
}
]
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
logging.info(f"成功为文档 {document_id} 添加元数据")
return True
else:
logging.error(f"添加元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return False
except Exception as e:
logging.error(f"添加元数据请求失败: {e}")
return False
def get_dataset_metadata(self, dataset_id: str) -> List[Dict]:
"""
获取数据集的元数据。
:param dataset_id: 数据集ID。
:return: 元数据列表,如果获取失败则返回空列表。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
try:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
return response.json()
else:
logging.error(f"获取数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return []
except Exception as e:
logging.error(f"获取数据集元数据请求失败: {e}")
return []
def create_dataset_metadata(self, dataset_id: str, metadata_type: str, metadata_name: str) -> Dict:
"""
创建数据集元数据。
:param dataset_id: 数据集ID。
:param metadata_type: 元数据类型,如"string"。
:param metadata_name: 元数据名称。
:return: 创建的元数据信息,如果创建失败则返回空字典。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/metadata"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {
"type": metadata_type,
"name": metadata_name
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.json()["id"]:
logging.info(f"成功创建数据集元数据: {metadata_name}")
return response.json()
else:
logging.error(f"创建数据集元数据失败,状态码: {response.status_code}, 响应: {response.text}")
return {}
except Exception as e:
logging.error(f"创建数据集元数据请求失败: {e}")
return {}
def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str:
"""
获取指定文档的最后更新时间。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 最后更新时间字符串。
"""
doc_dict = self.get_documents(dataset_id, keyword=document_name)
if len(doc_dict) != 1:
print(f'获取失败,{doc_dict}')
return ''
val = list(doc_dict.values())
return val[0]['created_at']
def del_document_by_name(self, dataset_id: str, document_name: str) -> bool:
"""
通过文档名称删除文档。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 如果删除成功返回True,否则返回False。
"""
document_id = self.get_document_id(dataset_id, document_name)
if document_id == '':
return False
return self.del_document_by_id(dataset_id, document_id)
def del_document_by_id(self, dataset_id: str, document_id: str) -> bool:
"""
删除指定ID的文档。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:return: 如果删除成功返回True,否则返回False。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
response = requests.delete(url, headers=headers, verify=False)
if response.status_code == 200:
print("Document deleted successfully.")
return True
else:
print(f"Failed to delete document. Status code: {response.status_code}, Response: {response.text}")
return False
def get_all_dataset_list(self) -> List[dict]:
"""
获取所有数据集列表。
:return: 数据集列表。
:raises: Exception 如果请求失败。
"""
url = f'{self.dify_url}/datasets'
params = {
'page': 1,
'limit': 100
}
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
response = requests.get(url, headers=headers, params=params, verify=False)
if response.status_code == 200:
return response.json()['data']
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
def add_document_segments(self, dataset_id: str, document_id: str, segments_list: List[dict]) -> bool:
"""
向文档添加段落。每次最多上传50条数据。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segments_list: 段落列表。
:return: 如果所有段落添加成功则返回True,否则返回False。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 将segments_list按每50条数据分组
batch_size = 50
success = True
for i in range(0, len(segments_list), batch_size):
batch = segments_list[i:i + batch_size]
segments = [
{
"content": str(seg['content']),
"answer": str(seg['answer']),
"keywords": seg['keywords']
}
for seg in batch
]
data = {'segments': segments}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
response_data = response.json()['data']
if response_data[0]['status'] == 'error':
logging.error(f"分段上传失败: {response_data[0]['error']}")
success = False
else:
raise Exception(f"添加段落失败。状态码:{response.status_code},响应:{response.text}")
# 添加短暂延时,避免请求过于频繁
time.sleep(0.5)
except Exception as e:
logging.error(f"上传段落批次 {i//batch_size + 1} 失败: {e}")
success = False
return success
def get_document_segments(self, dataset_id: str, document_id: str) -> List[Dict]:
"""
获取指定文档的分段信息。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:return: 分段信息列表。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
limit = 100
page = 0
all_segments = []
try:
while True:
page += 1
params = {
'page': page,
'limit': limit
}
response = requests.get(url, headers=headers, params=params, verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
data = response.json()
all_segments.extend(data.get("data", []))
if not data.get("has_more", False):
break
return all_segments
except Exception as e:
logging.error(f"获取文档分段失败: {e}")
raise
def update_document_segment(
self,
dataset_id: str,
document_id: str,
segment_id: str,
content: str,
answer: str,
keywords: List[str],
enabled: bool,
regenerate_child_chunks: bool = True
) -> Dict:
"""
更新指定文档的某个分段信息。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param content: 分段内容。
:param answer: 分段答案。
:param keywords: 分段关键词列表。
:param enabled: 是否启用分段。
:return: 更新后的分段信息。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 构造请求数据
data = {
"segment": {
"content": content,
"answer": answer,
"keywords": keywords,
"enabled": enabled,
"regenerate_child_chunks": regenerate_child_chunks
}
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
if response.json()["data"]['error']:
logging.error(f"更新文档分段失败: {response.json()['data']['error']}")
raise
return response.json().get("data", {}) # 返回更新后的分段信息
except Exception as e:
logging.error(f"更新文档分段失败: {e}")
raise
def upload_file(self, file_path: str, max_retries: int = 3) -> str:
url = f"{self.dify_url}/files/upload"
headers = {
'Authorization': f'Bearer {self.dify_app_api_key}'
}
# 获取文件的MIME类型
mime_type = mimetypes.guess_type(file_path)[0]
if not mime_type:
raise Exception(f"无法确定文件类型: {file_path}")
# 读取文件内容
with open(file_path, 'rb') as file:
file_content = file.read()
for attempt in range(max_retries):
try:
files = {
'file': (
os.path.basename(file_path),
file_content,
mime_type
)
}
response = requests.post(url, headers=headers, files=files, verify=False)
if response.ok:
logging.info(f"上传文件成功,文件: {file_path}")
return response.json()['id']
else:
error_msg = f"上传文件失败,状态码: {response.status_code}, 响应: {response.text}"
if attempt == max_retries - 1: # 最后一次尝试
raise Exception(error_msg)
logging.warning(f"第{attempt + 1}次尝试失败: {error_msg}")
time.sleep(1) # 重试前等待1秒
except Exception as e:
if attempt == max_retries - 1: # 最后一次尝试
logging.error(f"上传文件失败: {e}")
raise Exception(f"上传文件失败: {e}")
logging.warning(f"第{attempt + 1}次尝试失败: {e}")
time.sleep(1) # 重试前等待1秒
def add_document_child_chunk(
self,
dataset_id: str,
document_id: str,
segment_id: str,
content: str
) -> Dict:
"""
新增文档子分段。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param content: 子分段内容。
:return: 新增的子分段信息。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 构造请求数据
data = {
"content": content
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
return response.json().get("data", {}) # 返回新增的子分段信息
except Exception as e:
logging.error(f"新增文档子分段失败: {e}")
raise
def get_document_child_chunks(
self,
dataset_id: str,
document_id: str,
segment_id: str,
page: int = 1,
limit: int = 100
) -> Dict:
"""
获取文档子分段列表。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param page: 页码,默认为1。
:param limit: 每页数量,默认为20。
:return: 子分段列表信息,包含分页信息。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
params = {
'page': page,
'limit': limit
}
try:
response = requests.get(url, headers=headers, params=params, verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
return response.json() # 返回子分段列表信息,包含分页信息
except Exception as e:
logging.error(f"获取文档子分段列表失败: {e}")
raise
def del_document_child_chunk(
self,
dataset_id: str,
document_id: str,
segment_id: str,
child_chunk_id: str
) -> bool:
"""
删除文档子分段。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param child_chunk_id: 子分段ID。
:return: 如果删除成功返回True,否则返回False。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks/{child_chunk_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
try:
response = requests.delete(url, headers=headers, verify=False)
if response.status_code == 200:
logging.info(f"删除子分段成功: {child_chunk_id}")
return True
else:
logging.error(f"删除子分段失败,状态码: {response.status_code}, 响应: {response.text}")
return False
except Exception as e:
logging.error(f"删除子分段失败: {e}")
return False
def update_document_child_chunk(
self,
dataset_id: str,
document_id: str,
segment_id: str,
child_chunk_id: str,
content: str
) -> Dict:
"""
更新文档子分段内容。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param child_chunk_id: 子分段ID。
:param content: 更新的子分段内容。
:return: 更新后的子分段信息。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}/child_chunks/{child_chunk_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 构造请求数据
data = {
"content": content
}
try:
response = requests.patch(url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
return response.json().get("data", {}) # 返回更新后的子分段信息
except Exception as e:
logging.error(f"更新文档子分段失败: {e}")
raise
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
d = DifyApi()
# d.remove_dataset_all_doc("8673162d-0db1-4752-905e-ae3ef377a541")
# d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690")
# d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1")