import os.path import requests import json import time from pathlib import Path class WikijsTool: BASE_URL = "http://10.1.16.39:8090/graphql" HEADERS = { "Authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGkiOjcsImdycCI6MSwiaWF" "0IjoxNzIzMDIwNzg4LCJleHAiOjE4MTc2OTM1ODgsImF1ZCI6InVybjp3aWtpLmpzIiwiaX" "NzIjoidXJuOndpa2kuanMifQ.NSfE4tB7tkN8yapAs0CgkR-Yll6wc3gO3QGKMAv-TlGxx6A-9fJRmkwhRDTVMj_yPVG6" "NXVy_AZpJtLapRXFGn0cvscsRJxq3fY1KgEyt8wO99jvd8DpNHpHhAIgrtyDelmHsBD2Wb5Ib3WJFsWC6d8Yhm9dkpx6tZ" "vMAlFIKOg6UodMoMIry3YWiPGLaqJPQ0gcKmcnB2tC7sPXIIZnvfb5912GVM0n-4wvWobQnb_tXQuYZf99wH_leXjC_7BK8" "8JSaAmB980i3rBxfejmaJ8E6D48zRxwwPFa0veVjjzRkVqHPwAjl1CXb2HE29pGtNmSEE1kLQVqOZD_ibOwKQ" } def __init__(self): pass @staticmethod def init_url(): # 获取当前文件的路径 file_path = Path(__file__).resolve() file_path = os.path.join(file_path.parent, 'wikiconfig.json') if not os.path.exists(file_path): return False with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) if 'url' in data: WikijsTool.BASE_URL = data['url'] if 'Authorization' in data: WikijsTool.HEADERS['Authorization'] = data['Authorization'] return True @staticmethod def get_all_documents() -> list[dict]: query = """ query Pages { pages { list { path locale title contentType id isPublished } } } """ # 构建请求数据 data = { 'query': query, } # 发送 POST 请求 response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data) if response.status_code == 200: # 解析数据 list_info = json.loads(response.content)['data']['pages']['list'] return [item for item in list_info] else: raise ValueError(f"获取文档列表失败,原因:“{response.text}") @staticmethod def get_all_doc_by_path(path: str, add_path_end: bool = True) -> list[dict]: list_document = WikijsTool.get_all_documents() all_document_list = [] if add_path_end: temp_path = path + '/' else: temp_path = path for document_info in list_document: document_path = str(document_info["path"]) # 根据路径过滤出对应的所有文档 if not document_path.startswith(temp_path): continue all_document_list.append(document_info) return all_document_list ill_char = ['+', '.', '?', "%", '#', '&', '=', '<', '>', '"', '{', '}', '|', '^', '~', '[', ']', ' '] @staticmethod def judge_path_is_ill(path) -> (bool, str): # 判断路径是否非法 set1 = set(WikijsTool.ill_char) set2 = set(path) # 判断两个集合是否有交集 if bool(set1 & set2): new_list = WikijsTool.ill_char[:] new_list.pop() error = ', '.join(new_list) + ", 空格。" return False, f"路径中包含非法字符,非法字符包括:{error}" return True, "" @staticmethod def search_document(query_str: str) -> list[dict]: graphql_query = f""" query Pages {{ pages {{ search(query: "{query_str}") {{ results {{ id path locale title }} }} }} }} """ # 构建请求数据 data = { 'query': graphql_query, } # 发送 POST 请求 response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data) if response.status_code == 200: # 解析数据 search_results = json.loads(response.content)['data']['pages']['search']['results'] return search_results else: raise ValueError(f"查询文档失败,原因:“{response.text}") @staticmethod def query_doc_info(doc_id: int) -> dict: query = """ query singlePages($doc_id: Int!) { pages { single(id: $doc_id) { id path title isPublished content contentType isPrivate updatedAt createdAt } } } """ # 构建请求数据 variables = { 'doc_id': doc_id, } data = { 'query': query, 'variables': variables } # 发送 POST 请求 response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data) if "errors" in response.text: result = json.loads(response.content)['errors'][0]['message'] return {} else: return json.loads(response.content)['data']['pages']['single'] import html2text import re def to_markdown(input1): html_content = """

Title

This is a bold and italic text.

""" markdown_converter = html2text.HTML2Text() markdown_converter.ignore_links = False # 保留链接 markdown_content = markdown_converter.handle(input1) markdown_content = re.sub(r'!\[.*?\]\(.*?\)\s*[^\n]*', '', markdown_content) return markdown_content WikijsTool.init_url() info = WikijsTool.get_all_documents() # import csv # # # 定义 CSV 文件名 # csv_filename = "info_data2.26.csv" # # # 写入 CSV 文件 # with open(csv_filename, mode='w', newline='', encoding='utf-8') as file: # writer = csv.writer(file) # # # 写入表头 # writer.writerow(['path', 'title', 'id']) # # # 写入数据 # for i in info: # writer.writerow([i['path'], i['title'], i['id']]) # # print(f"数据已保存到 {csv_filename}") # print(info) xizang_index = [8461, 8401, 8191, 8705, 8399, 8202, 8356, 8269, 8268, 8266, 8671, 8267, 8476, 8472, 8094, 8851, 8271, 8479, 8418, 8093, 8417, 8473, 8419, 8462, 8482, 8856, 8669, 8484, 8422, 8264, 8265, 8432, 7847, 8204, 8405, 8707, 8704, 8682, 8352, 8273, 8720, 8474, 8272, 8801, 8677, 8235, 8243, 8490, 8155, 8460, 8165, 8732, 8246, 8493, 8354, 8733, 8513, 8481, 8492, 8598, 8231, 8240, 8742, 8820, 8431, 8200, 8357, 8350, 8223, 8198, 8369, 8827, 7931, 8753, 8136, 8877, 8741, 8852, 8039, 8463, 8863, 8731, 8435, 8815, 7495, 8286, 8129, 7930, 7752, 8128, 7929, 8728, 8816, 8694, 8693, 8698, 8689, 8690, 7740, 7739, 8058, 8812, 8744, 8455, 7738, 7845, 7846, 8063, 8069, 8133, 7833, 8066, 7834, 8287, 8679, 8087, 7956, 8850, 8743, 8060, 8748, 8746, 8458, 8067, 8754, 8404, 8055, 8065, 8723, 8400, 8403, 8064, 8755, 8363, 8503, 7949, 7948, 8500, 8355, 8391, 8402, 8757, 8756, 8379, 8174, 8459, 8367, 8450, 8192, 8120, 8193, 8791, 8413, 8747, 8270, 8424, 8796, 8249, 8250, 8478, 8475, 8483, 8499, 8098, 8079, 8103, 8251, 8374, 8252, 8049, 8253, 8506, 8057, 7989, 8056, 7945, 8842, 8410, 8409, 8480, 8497, 8862, 8514, 8507, 8512, 8502, 7762, 8515, 8501, 8373, 8360, 8217, 8713, 8092, 8095, 8100, 8438, 8745, 8068, 8072, 8070, 8408, 8075, 8806, 8071, 8434, 8074, 8433, 8421, 8511, 8496, 8495] down_index = [5362, 5409, 2500, 3926, 2389, 3611, 5430, 2166, 4296, 6545, 5396, 4080, 4171, 8836, 6785, 6448, 3745, 6029, 6846, 4718, 5250, 5898, 6924, 5341, 5286, 7387, 7133, 7437, 7395, 5484, 7273, 5279, 5254, 5271, 6419, 5143, 5779, 5454, 5139, 2608, 6587, 7277, 6835, 2148, 6308, 7564, 5234, 2382, 5752, 6169, 2503, 6551, 5104, 7230, 6537, 3849, 5771, 5190, 6503, 6534, 6122, 7178, 2641, 6301, 2427, 5410, 5494, 5493, 5422, 7034, 5121, 6257, 4114, 3764, 7446, 6601, 5385, 6041, 5423, 6961, 5151, 6592, 5394, 5303, 3757, 6925, 3747, 5474, 5027, 3759, 8831, 4784, 2604, 3573, 5119, 8861, 7016, 7013, 7014, 7565, 4990, 5926, 5380, 5135, 5345, 3570, 3854, 7566] # a = WikijsTool.query_doc_info(8401) # print(to_markdown(a['title'])) # print(to_markdown(a['content'])) # 读取 txt 文件 # with open("ceshi.txt", "r", encoding="utf-8") as file: # # lines = file.read().splitlines() # 按行读取并去除换行符 # # # 转换为数字列表(int 或 float) # numbers = [int(line) if line.isdigit() else float(line) for line in lines if line.strip()] # # # 输出列表 # print(numbers) # 目标文件夹 # output_dir = "DM_data/xizang_kg" # os.makedirs(output_dir, exist_ok=True) # 确保目录存在 # # for doc_id in xizang_index: # a = WikijsTool.query_doc_info(doc_id) # 获取文档信息 # title = to_markdown(a['title']).strip() # 获取标题 # content = to_markdown(a['content']).strip() # 获取内容 # # if content: # 只有内容非空时才保存 # filename = f"{title}.txt" # filepath = os.path.join(output_dir, filename) # # # 确保文件名合法(避免非法字符) # filename = "".join(c if c.isalnum() or c in " _-" else "_" for c in filename) # filepath = os.path.join(output_dir, filename) # # with open(filepath, "w", encoding="utf-8") as f: # f.write(content) # # print(f"保存成功: {filepath}") # else: # print(f"跳过 ID {doc_id}: 内容为空") import os import re # 目标文件夹 output_dir = "DM_data/down_kg" os.makedirs(output_dir, exist_ok=True) # 确保目录存在 def sanitize_filename(filename): """去除或替换非法文件名字符""" filename = re.sub(r'[<>:"/\\|?*]', '_', filename) # 替换非法字符 return filename.strip() # 去除首尾空格 for doc_id in down_index: a = WikijsTool.query_doc_info(doc_id) # 获取文档信息 title = to_markdown(a['title']).strip() # 获取标题 content = to_markdown(a['content']).strip() # 获取内容 if content: # 只有内容非空时才保存 safe_title = sanitize_filename(title) # 处理非法字符 filename = f"{safe_title}.txt" filepath = os.path.join(output_dir, filename) with open(filepath, "w", encoding="utf-8") as f: f.write(content) print(f"保存成功: {filepath}") else: print(f"跳过 ID {doc_id}: 内容为空")