3.31 上传 dm rewrite

This commit is contained in:
Zdao032
2025-03-31 15:17:47 +08:00
commit b444310280
430 changed files with 39039 additions and 0 deletions
+308
View File
@@ -0,0 +1,308 @@
import os.path
import requests
import json
import time
from pathlib import Path
class WikijsTool:
BASE_URL = "http://10.1.16.39:8090/graphql"
HEADERS = {
"Authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGkiOjcsImdycCI6MSwiaWF"
"0IjoxNzIzMDIwNzg4LCJleHAiOjE4MTc2OTM1ODgsImF1ZCI6InVybjp3aWtpLmpzIiwiaX"
"NzIjoidXJuOndpa2kuanMifQ.NSfE4tB7tkN8yapAs0CgkR-Yll6wc3gO3QGKMAv-TlGxx6A-9fJRmkwhRDTVMj_yPVG6"
"NXVy_AZpJtLapRXFGn0cvscsRJxq3fY1KgEyt8wO99jvd8DpNHpHhAIgrtyDelmHsBD2Wb5Ib3WJFsWC6d8Yhm9dkpx6tZ"
"vMAlFIKOg6UodMoMIry3YWiPGLaqJPQ0gcKmcnB2tC7sPXIIZnvfb5912GVM0n-4wvWobQnb_tXQuYZf99wH_leXjC_7BK8"
"8JSaAmB980i3rBxfejmaJ8E6D48zRxwwPFa0veVjjzRkVqHPwAjl1CXb2HE29pGtNmSEE1kLQVqOZD_ibOwKQ"
}
def __init__(self):
pass
@staticmethod
def init_url():
# 获取当前文件的路径
file_path = Path(__file__).resolve()
file_path = os.path.join(file_path.parent, 'wikiconfig.json')
if not os.path.exists(file_path):
return False
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
if 'url' in data:
WikijsTool.BASE_URL = data['url']
if 'Authorization' in data:
WikijsTool.HEADERS['Authorization'] = data['Authorization']
return True
@staticmethod
def get_all_documents() -> list[dict]:
query = """
query Pages {
pages {
list {
path
locale
title
contentType
id
isPublished
}
}
}
"""
# 构建请求数据
data = {
'query': query,
}
# 发送 POST 请求
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
if response.status_code == 200:
# 解析数据
list_info = json.loads(response.content)['data']['pages']['list']
return [item for item in list_info]
else:
raise ValueError(f"获取文档列表失败,原因:“{response.text}")
@staticmethod
def get_all_doc_by_path(path: str, add_path_end: bool = True) -> list[dict]:
list_document = WikijsTool.get_all_documents()
all_document_list = []
if add_path_end:
temp_path = path + '/'
else:
temp_path = path
for document_info in list_document:
document_path = str(document_info["path"])
# 根据路径过滤出对应的所有文档
if not document_path.startswith(temp_path):
continue
all_document_list.append(document_info)
return all_document_list
ill_char = ['+', '.', '?', "%", '#', '&', '=', '<', '>', '"', '{', '}', '|', '^', '~', '[', ']', ' ']
@staticmethod
def judge_path_is_ill(path) -> (bool, str):
# 判断路径是否非法
set1 = set(WikijsTool.ill_char)
set2 = set(path)
# 判断两个集合是否有交集
if bool(set1 & set2):
new_list = WikijsTool.ill_char[:]
new_list.pop()
error = ', '.join(new_list) + ", 空格。"
return False, f"路径中包含非法字符,非法字符包括:{error}"
return True, ""
@staticmethod
def search_document(query_str: str) -> list[dict]:
graphql_query = f"""
query Pages {{
pages {{
search(query: "{query_str}") {{
results {{
id
path
locale
title
}}
}}
}}
}}
"""
# 构建请求数据
data = {
'query': graphql_query,
}
# 发送 POST 请求
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
if response.status_code == 200:
# 解析数据
search_results = json.loads(response.content)['data']['pages']['search']['results']
return search_results
else:
raise ValueError(f"查询文档失败,原因:“{response.text}")
@staticmethod
def query_doc_info(doc_id: int) -> dict:
query = """
query singlePages($doc_id: Int!) {
pages {
single(id: $doc_id) {
id
path
title
isPublished
content
contentType
isPrivate
updatedAt
createdAt
}
}
}
"""
# 构建请求数据
variables = {
'doc_id': doc_id,
}
data = {
'query': query,
'variables': variables
}
# 发送 POST 请求
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
if "errors" in response.text:
result = json.loads(response.content)['errors'][0]['message']
return {}
else:
return json.loads(response.content)['data']['pages']['single']
import html2text
import re
def to_markdown(input1):
html_content = """
<h1>Title</h1>
<p>This is a <strong>bold</strong> and <em>italic</em> text.</p>
<ul>
<li>Item 1</li>
<li>Item 2</li>
</ul>
"""
markdown_converter = html2text.HTML2Text()
markdown_converter.ignore_links = False # 保留链接
markdown_content = markdown_converter.handle(input1)
markdown_content = re.sub(r'!\[.*?\]\(.*?\)\s*[^\n]*', '', markdown_content)
return markdown_content
WikijsTool.init_url()
info = WikijsTool.get_all_documents()
# import csv
#
# # 定义 CSV 文件名
# csv_filename = "info_data2.26.csv"
#
# # 写入 CSV 文件
# with open(csv_filename, mode='w', newline='', encoding='utf-8') as file:
# writer = csv.writer(file)
#
# # 写入表头
# writer.writerow(['path', 'title', 'id'])
#
# # 写入数据
# for i in info:
# writer.writerow([i['path'], i['title'], i['id']])
#
# print(f"数据已保存到 {csv_filename}")
# print(info)
xizang_index = [8461, 8401, 8191, 8705, 8399, 8202, 8356, 8269, 8268, 8266, 8671, 8267, 8476, 8472, 8094, 8851, 8271, 8479,
8418, 8093, 8417, 8473, 8419, 8462, 8482, 8856, 8669, 8484, 8422, 8264, 8265, 8432, 7847, 8204, 8405, 8707,
8704, 8682, 8352, 8273, 8720, 8474, 8272, 8801, 8677, 8235, 8243, 8490, 8155, 8460, 8165, 8732, 8246, 8493,
8354, 8733, 8513, 8481, 8492, 8598, 8231, 8240, 8742, 8820, 8431, 8200, 8357, 8350, 8223, 8198, 8369, 8827,
7931, 8753, 8136, 8877, 8741, 8852, 8039, 8463, 8863, 8731, 8435, 8815, 7495, 8286, 8129, 7930, 7752, 8128,
7929, 8728, 8816, 8694, 8693, 8698, 8689, 8690, 7740, 7739, 8058, 8812, 8744, 8455, 7738, 7845, 7846, 8063,
8069, 8133, 7833, 8066, 7834, 8287, 8679, 8087, 7956, 8850, 8743, 8060, 8748, 8746, 8458, 8067, 8754, 8404,
8055, 8065, 8723, 8400, 8403, 8064, 8755, 8363, 8503, 7949, 7948, 8500, 8355, 8391, 8402, 8757, 8756, 8379,
8174, 8459, 8367, 8450, 8192, 8120, 8193, 8791, 8413, 8747, 8270, 8424, 8796, 8249, 8250, 8478, 8475, 8483,
8499, 8098, 8079, 8103, 8251, 8374, 8252, 8049, 8253, 8506, 8057, 7989, 8056, 7945, 8842, 8410, 8409, 8480,
8497, 8862, 8514, 8507, 8512, 8502, 7762, 8515, 8501, 8373, 8360, 8217, 8713, 8092, 8095, 8100, 8438, 8745,
8068, 8072, 8070, 8408, 8075, 8806, 8071, 8434, 8074, 8433, 8421, 8511, 8496, 8495]
down_index = [5362, 5409, 2500, 3926, 2389, 3611, 5430, 2166, 4296, 6545, 5396, 4080, 4171, 8836, 6785, 6448, 3745, 6029,
6846, 4718, 5250, 5898, 6924, 5341, 5286, 7387, 7133, 7437, 7395, 5484, 7273, 5279, 5254, 5271, 6419, 5143,
5779, 5454, 5139, 2608, 6587, 7277, 6835, 2148, 6308, 7564, 5234, 2382, 5752, 6169, 2503, 6551, 5104, 7230,
6537, 3849, 5771, 5190, 6503, 6534, 6122, 7178, 2641, 6301, 2427, 5410, 5494, 5493, 5422, 7034, 5121, 6257,
4114, 3764, 7446, 6601, 5385, 6041, 5423, 6961, 5151, 6592, 5394, 5303, 3757, 6925, 3747, 5474, 5027, 3759,
8831, 4784, 2604, 3573, 5119, 8861, 7016, 7013, 7014, 7565, 4990, 5926, 5380, 5135, 5345, 3570, 3854, 7566]
# a = WikijsTool.query_doc_info(8401)
# print(to_markdown(a['title']))
# print(to_markdown(a['content']))
# 读取 txt 文件
# with open("ceshi.txt", "r", encoding="utf-8") as file:
#
# lines = file.read().splitlines() # 按行读取并去除换行符
#
# # 转换为数字列表(int 或 float)
# numbers = [int(line) if line.isdigit() else float(line) for line in lines if line.strip()]
#
# # 输出列表
# print(numbers)
# 目标文件夹
# output_dir = "DM_data/xizang_kg"
# os.makedirs(output_dir, exist_ok=True) # 确保目录存在
#
# for doc_id in xizang_index:
# a = WikijsTool.query_doc_info(doc_id) # 获取文档信息
# title = to_markdown(a['title']).strip() # 获取标题
# content = to_markdown(a['content']).strip() # 获取内容
#
# if content: # 只有内容非空时才保存
# filename = f"{title}.txt"
# filepath = os.path.join(output_dir, filename)
#
# # 确保文件名合法(避免非法字符)
# filename = "".join(c if c.isalnum() or c in " _-" else "_" for c in filename)
# filepath = os.path.join(output_dir, filename)
#
# with open(filepath, "w", encoding="utf-8") as f:
# f.write(content)
#
# print(f"保存成功: {filepath}")
# else:
# print(f"跳过 ID {doc_id}: 内容为空")
import os
import re
# 目标文件夹
output_dir = "DM_data/down_kg"
os.makedirs(output_dir, exist_ok=True) # 确保目录存在
def sanitize_filename(filename):
"""去除或替换非法文件名字符"""
filename = re.sub(r'[<>:"/\\|?*]', '_', filename) # 替换非法字符
return filename.strip() # 去除首尾空格
for doc_id in down_index:
a = WikijsTool.query_doc_info(doc_id) # 获取文档信息
title = to_markdown(a['title']).strip() # 获取标题
content = to_markdown(a['content']).strip() # 获取内容
if content: # 只有内容非空时才保存
safe_title = sanitize_filename(title) # 处理非法字符
filename = f"{safe_title}.txt"
filepath = os.path.join(output_dir, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
print(f"保存成功: {filepath}")
else:
print(f"跳过 ID {doc_id}: 内容为空")