新增DifyApi类以支持与Dify API的交互,包含文档上传、索引状态获取、数据集管理等功能。同时优化DifyCompareTest.py中的缺失槽位判断逻辑,

This commit is contained in:
2025-07-02 15:53:51 +08:00
parent d835439fea
commit b8ccdcf287
3 changed files with 482 additions and 4 deletions
+2 -3
View File
@@ -572,9 +572,8 @@ content: "{content}"
slot_info_data = json.loads(slot_info)
else:
slot_info_data = slot_info
slot_missing = slot_info_data.get("slot_missing", None)
slot_missing_str = "完整" if not slot_missing else "缺失"
slot_missing = slot_info_data.get("missing_slots", {})
slot_missing_str = "完整" if len(slot_missing) == 0 else "缺失"
# 返回结果
return {
"问题": query,
+479
View File
@@ -0,0 +1,479 @@
import json
import logging
import os
import time
import mimetypes
import requests
from typing import Dict, List
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DifyApi:
"""
用于与Dify API进行交互的类。
"""
def __init__(self, dify_url: str="http://10.1.16.39/v1",
dify_dataset_api_key: str="dataset-skLjmPVonjHo119OWNf3kAmY",
dify_app_api_key: str="app-wUdkWJx5zeOvmvBUZizMoSw3"):
self.dify_url = dify_url
self.dify_dataset_api_key = dify_dataset_api_key
self.dify_app_api_key = dify_app_api_key
def get_document_indexing_status(self, datasets_id: str, batch: str) -> bool:
"""
获取文档的索引状态。
:param datasets_id: 数据集ID。
:param batch: 批次ID。
:return: 索引状态是否完成。
"""
url = f'{self.dify_url}/datasets/{datasets_id}/documents/{batch}/indexing-status'
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
try:
response = requests.get(url, headers=headers, verify=False)
response.raise_for_status()
return response.json().get("data", [{}])[0].get("indexing_status") == "completed"
except Exception as e:
logging.error(f"获取索引状态失败: {e}")
return False
def upload_text_to_document(self, text_name: str, text: str, dataset_id: str, enable_subchunk:bool=False) -> str:
"""
上传文本作为文档到Dify API并返回文档ID。
:param text_name: 文本文档的名称。
:param text: 文本文档的内容。
:param dataset_id: 数据集ID。
:return: 创建的文档ID。
:raises Exception: 如果文档创建失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/document/create_by_text"
headers = {
"Authorization": f"Bearer {self.dify_dataset_api_key}",
"Content-Type": "application/json"
}
if enable_subchunk:
doc_form='hierarchical_model'
mode='hierarchical'
else:
doc_form='text_model'
mode='custom'
process_rule = {
"rules": {
"pre_processing_rules": [
{"id": "remove_extra_spaces", "enabled": True},
{"id": "remove_urls_emails", "enabled": False}
],
"segmentation": {
"separator": "==================================",
"max_tokens": 1999,
"chunk_overlap": 300
},
"parent_mode":"paragraph",
"subchunk_segmentation": {
"separator": "\\n",
"max_tokens": 100,
"chunk_overlap": 20
},
},
"mode": mode
}
# 启用父子级模式 doc_form==hierarchical_model and mode==hierarchical
# 不启用父子级模式 doc_form==text_model and mode==custom
data = {
"name": text_name,
"text": text,
"indexing_technique": "high_quality",
"doc_form": doc_form,
"process_rule": process_rule,
"doc_language": "Chinese",
}
try:
response = requests.post(url=url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status()
except Exception as e:
logging.error(f"上传文档失败: {e}")
raise
idx = 0
while idx < 5:
if self.get_document_indexing_status(dataset_id, response.json().get("batch")):
break
time.sleep(1)
idx += 1
else:
logging.warning("文档索引超时,可能需要手动检查。")
return response.json().get("document", {}).get("id", "")
def get_or_create_dataset_by_name(self, dataset_name: str) -> str:
"""
通过名称获取或创建数据集。
:param dataset_name: 数据集名称。
:return: 数据集ID。
"""
list_dataset = self.get_all_dataset_list()
for dataset in list_dataset:
if dataset["name"] == dataset_name:
return dataset["id"]
logging.info(f"数据集不存在,创建数据集: {dataset_name}")
return self.create_dataset(dataset_name)
def get_documents(self, dataset_id: str, keyword: str = None) -> Dict[str, dict]:
"""
获取指定数据集的所有文档。
:param dataset_id: 数据集ID。
:param keyword: 关键词,用于过滤文档。
:return: 文档字典,键为文档ID,值为文档信息。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents"
headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"}
limit = 100
page = 0
all_document = {}
while True:
page += 1
params = {
'keyword': keyword,
'page': page,
'limit': limit
}
response = requests.get(url, headers=headers, params=params, verify=False)
if response.status_code == 200:
data = response.json()
parsed_data = {item["id"]: item for item in data['data']}
all_document.update(parsed_data)
if not data["has_more"]:
break
else:
print(f"获取文档失败:{response.status_code}{response.text}")
break
return all_document
def remove_dataset_all_doc(self, dataset_id: str):
"""
删除数据集中所有的文档。
:param dataset_id: 数据集ID。
:raises: Exception 如果文档删除失败。
"""
list_doc_id = self.get_documents(dataset_id=dataset_id)
delete_url = f"{self.dify_url}/datasets/{dataset_id}/documents"
headers = {"Authorization": f"Bearer {self.dify_dataset_api_key}"}
# 删除每个文档
for doc_id in list_doc_id:
response = requests.delete(url=f"{delete_url}/{doc_id}", headers=headers, verify=False)
if response.status_code != 200:
raise Exception(f"删除失败:{doc_id},状态码:{response.status_code},响应内容:{response.text}")
print(f"已删除文档ID列表: {list_doc_id}")
def create_dataset(self, dataset_name: str) -> str:
"""
创建数据集。
:param dataset_name: 数据集名称。
:return: 创建的数据集ID。
:raises: Exception 如果数据集创建失败。
"""
url = f'{self.dify_url}/datasets'
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
data = {'name': dataset_name,
"indexing_technique": "high_quality",
"permission": "all_team_members",}
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
return response.json()['id']
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
def get_document_id(self, dataset_id: str, document_name: str) -> str:
"""
获取指定名称的文档ID。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 文档ID,如果未找到则返回空字符串。
"""
# 获取所有文档
doc_info = self.get_documents(dataset_id, keyword=document_name)
for doc_id, info in doc_info.items():
if info["name"].split('.')[0] == document_name:
return doc_id
print(f'获取文档ID失败。名称: {document_name}。原因:未找到文档ID')
return ''
def get_document_last_update_time(self, dataset_id: str, document_name: str) -> str:
"""
获取指定文档的最后更新时间。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 最后更新时间字符串。
"""
doc_dict = self.get_documents(dataset_id, keyword=document_name)
if len(doc_dict) != 1:
print(f'获取失败,{doc_dict}')
return ''
val = list(doc_dict.values())
return val[0]['created_at']
def del_document_by_name(self, dataset_id: str, document_name: str) -> bool:
"""
通过文档名称删除文档。
:param dataset_id: 数据集ID。
:param document_name: 文档名称。
:return: 如果删除成功返回True,否则返回False。
"""
document_id = self.get_document_id(dataset_id, document_name)
if document_id == '':
return False
return self.del_document_by_id(dataset_id, document_id)
def del_document_by_id(self, dataset_id: str, document_id: str) -> bool:
"""
删除指定ID的文档。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:return: 如果删除成功返回True,否则返回False。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
response = requests.delete(url, headers=headers, verify=False)
if response.status_code == 200:
print("Document deleted successfully.")
return True
else:
print(f"Failed to delete document. Status code: {response.status_code}, Response: {response.text}")
return False
def get_all_dataset_list(self) -> List[dict]:
"""
获取所有数据集列表。
:return: 数据集列表。
:raises: Exception 如果请求失败。
"""
url = f'{self.dify_url}/datasets'
params = {
'page': 1,
'limit': 100
}
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}'
}
response = requests.get(url, headers=headers, params=params, verify=False)
if response.status_code == 200:
return response.json()['data']
else:
raise Exception(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
def add_document_segments(self, dataset_id: str, document_id: str, segments_list: List[dict]) -> bool:
"""
向文档添加段落。每次最多上传50条数据。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segments_list: 段落列表。
:return: 如果所有段落添加成功则返回True,否则返回False。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 将segments_list按每50条数据分组
batch_size = 50
success = True
for i in range(0, len(segments_list), batch_size):
batch = segments_list[i:i + batch_size]
segments = [
{
"content": str(seg['content']),
"answer": str(seg['answer']),
"keywords": seg['keywords']
}
for seg in batch
]
data = {'segments': segments}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
if response.status_code == 200:
response_data = response.json()['data']
if response_data[0]['status'] == 'error':
logging.error(f"分段上传失败: {response_data[0]['error']}")
success = False
else:
raise Exception(f"添加段落失败。状态码:{response.status_code},响应:{response.text}")
# 添加短暂延时,避免请求过于频繁
time.sleep(0.5)
except Exception as e:
logging.error(f"上传段落批次 {i//batch_size + 1} 失败: {e}")
success = False
return success
def get_document_segments(self, dataset_id: str, document_id: str) -> List[Dict]:
"""
获取指定文档的分段信息。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:return: 分段信息列表。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
try:
response = requests.get(url, headers=headers, verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
return response.json().get("data", []) # 返回分段信息列表
except Exception as e:
logging.error(f"获取文档分段失败: {e}")
raise
def update_document_segment(
self,
dataset_id: str,
document_id: str,
segment_id: str,
content: str,
answer: str,
keywords: List[str],
enabled: bool
) -> Dict:
"""
更新指定文档的某个分段信息。
:param dataset_id: 数据集ID。
:param document_id: 文档ID。
:param segment_id: 分段ID。
:param content: 分段内容。
:param answer: 分段答案。
:param keywords: 分段关键词列表。
:param enabled: 是否启用分段。
:return: 更新后的分段信息。
:raises: Exception 如果请求失败。
"""
url = f"{self.dify_url}/datasets/{dataset_id}/documents/{document_id}/segments/{segment_id}"
headers = {
'Authorization': f'Bearer {self.dify_dataset_api_key}',
'Content-Type': 'application/json'
}
# 构造请求数据
data = {
"segment": {
"content": content,
"answer": answer,
"keywords": keywords,
"enabled": enabled,
"regenerate_child_chunks": True
}
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), verify=False)
response.raise_for_status() # 如果响应状态码不是200,抛出异常
if response.json()["data"]['error']:
logging.error(f"更新文档分段失败: {response.json()['data']['error']}")
raise
return response.json().get("data", {}) # 返回更新后的分段信息
except Exception as e:
logging.error(f"更新文档分段失败: {e}")
raise
def upload_file(self, file_path: str, max_retries: int = 3) -> str:
url = f"{self.dify_url}/files/upload"
headers = {
'Authorization': f'Bearer {self.dify_app_api_key}'
}
# 获取文件的MIME类型
mime_type = mimetypes.guess_type(file_path)[0]
if not mime_type:
raise Exception(f"无法确定文件类型: {file_path}")
# 读取文件内容
with open(file_path, 'rb') as file:
file_content = file.read()
for attempt in range(max_retries):
try:
files = {
'file': (
os.path.basename(file_path),
file_content,
mime_type
)
}
response = requests.post(url, headers=headers, files=files, verify=False)
if response.ok:
logging.info(f"上传文件成功,文件: {file_path}")
return response.json()['id']
else:
error_msg = f"上传文件失败,状态码: {response.status_code}, 响应: {response.text}"
if attempt == max_retries - 1: # 最后一次尝试
raise Exception(error_msg)
logging.warning(f"{attempt + 1}次尝试失败: {error_msg}")
time.sleep(1) # 重试前等待1秒
except Exception as e:
if attempt == max_retries - 1: # 最后一次尝试
logging.error(f"上传文件失败: {e}")
raise Exception(f"上传文件失败: {e}")
logging.warning(f"{attempt + 1}次尝试失败: {e}")
time.sleep(1) # 重试前等待1秒
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
d = DifyApi()
id = d.upload_file(r"D:\Code\DataConvertUpload\wiki3todify\images\5fd27f31858f808f7659165628bfb8a7.png")
print(id)
# d.remove_dataset_all_doc("0b835829-4d47-4419-832f-3cd6d9510b87")
# d.remove_dataset_all_doc("78abfb73-7e12-4dd4-92ff-b377b0235690")
# d.remove_dataset_all_doc("841b890e-c769-4839-8314-70756c0bf3c1")