上传问题改写、意图识别模块代码
This commit is contained in:
@@ -0,0 +1,256 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from typing import List, Optional, Dict
|
||||
from threading import Lock
|
||||
|
||||
API_KEY_LIST=[
|
||||
"sk-xxaiabmfhzwwpijuledllkmkzhzwsqeicjxmjwnvriqpwmpk",
|
||||
"sk-lldcprpqjhgdimiwewgbthngfbrazhkiuioubmaatrcpjjum",
|
||||
"sk-bppugibbtvujomvoysnbcdzpcwndxtwrkfvmgbkbzcmobdon",
|
||||
"sk-hnqitgdlfrrnpimcfxigqibstqquintnzpiidsshpajjyxqd",
|
||||
"sk-hrojkkkrrkmsajtnizokbcgexsfggdiqavbtvbayuwqbnmom",
|
||||
"sk-kkdklmnyompoiotzkfqahpayzlkgogfudjkyaebehtsowvid",
|
||||
"sk-sfxzvllifafbyfduupcdtcrjwhdyiyojnksyopnfslurnhsp",
|
||||
"sk-faqirxiszukfswqvzqawxnemqfacrkyurbxxkzwbbujqacdp",
|
||||
"sk-vonaanuueqiczppkntjuphateshrcpqpnvxmwxorkyihjmrb",
|
||||
"sk-qfpeoodgupcukcdstjcxgegwxnuhtxkkrupkogkcvhavxgny",
|
||||
"sk-fsvjnbpfgoadixympaabaukupuhjvbturcbxaqfdzjznemtr",
|
||||
"sk-fltvnbiqntfawjwkfnnhmyfiimzgzxkweqmefcfqkbucwrhi",
|
||||
"sk-oosswdriwyqkglwdigvcxgmcpyplcyowicbaugpizoscevdl",
|
||||
"sk-jswtxhkiralnyiukqimtyuurcaepulxdrfijadtxzrgsajyc",
|
||||
"sk-dcjuhoukdyrbneadtxtnyxzmigkpiqgtqqnreiprxpioftsv",
|
||||
"sk-yrhezyuxjblpaxzzudbowqmvcoxcammupcubghbodolikbdk",
|
||||
"sk-dsgvwpfagmarilmnewwbzhfzlqehburoupjaopucdvybpbdo",
|
||||
"sk-oljjlspuaurtoczyekztiidwtoerugadgepiufclpmrbdfqc",
|
||||
"sk-crgrimubjesthvxuqwedqqdoetljyrgeahxxpctfefgnkpyo",
|
||||
"sk-tubqhwgycxrdhwsqzjopxgeaqpsjdfppckckayvzornaluwq",
|
||||
"sk-amcxlmsdnadptpnehqnkvseolacipztmvovnmxojzohbjjil",
|
||||
"sk-pdyymhshpzmdduwxsezthnrgarnnhgzvmiflbpisfzxkiayt",
|
||||
"sk-qhwoorywmejumyudfxbrkegxtqifsbgcdkmpjckezepgyqnz",
|
||||
"sk-cpoctrgcnstaybeyuieuwjdgeakudhqdnnwdjavjudcbvvem",
|
||||
]
|
||||
|
||||
class APIKeyManager:
|
||||
"""
|
||||
API密钥管理器,用于解析环境变量中的多个API密钥并提供获取接口
|
||||
支持密钥轮转使用
|
||||
"""
|
||||
# 类变量,用于保存单例实例
|
||||
_instance = None
|
||||
_lock = Lock()
|
||||
|
||||
# 密钥使用计数和上次使用时间
|
||||
_key_usage: Dict[str, Dict] = {}
|
||||
# 当前正在使用的密钥索引
|
||||
_current_index = 0
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"):
|
||||
"""
|
||||
获取单例实例
|
||||
|
||||
Args:
|
||||
env_var_name: 环境变量名称,默认为'OPENAI_API_KEY'
|
||||
separator: 密钥分隔符,默认为分号
|
||||
|
||||
Returns:
|
||||
APIKeyManager实例
|
||||
"""
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = cls(env_var_name, separator)
|
||||
return cls._instance
|
||||
|
||||
@classmethod
|
||||
def get_api_key(cls) -> Optional[str]:
|
||||
"""
|
||||
静态方法:获取一个API密钥,使用轮转策略
|
||||
|
||||
Returns:
|
||||
API密钥,如果没有可用的密钥则返回None
|
||||
"""
|
||||
instance = cls.get_instance()
|
||||
return instance._get_next_api_key()
|
||||
|
||||
@classmethod
|
||||
def get_random_api_key(cls) -> Optional[str]:
|
||||
"""
|
||||
静态方法:随机获取一个API密钥
|
||||
|
||||
Returns:
|
||||
API密钥,如果没有可用的密钥则返回None
|
||||
"""
|
||||
instance = cls.get_instance()
|
||||
return instance._get_random_api_key()
|
||||
|
||||
@classmethod
|
||||
def get_valid_api_keys(cls) -> List[str]:
|
||||
"""
|
||||
静态方法:获取有效的API密钥列表
|
||||
|
||||
Returns:
|
||||
"""
|
||||
# 验证每一个apikey是否有效,无效则删除并打印日志。地址https://api.siliconflow.cn/v1/
|
||||
import requests
|
||||
import logging
|
||||
|
||||
valid_api_keys = []
|
||||
url = "https://api.siliconflow.cn/v1/chat/completions"
|
||||
headers_template = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
data = {
|
||||
"model": "deepseek-ai/DeepSeek-V3",
|
||||
"messages": [
|
||||
{"role": "user", "content": "ping"}
|
||||
],
|
||||
"max_tokens": 1
|
||||
}
|
||||
for key in API_KEY_LIST:
|
||||
headers = headers_template.copy()
|
||||
headers["Authorization"] = f"Bearer {key}"
|
||||
try:
|
||||
resp = requests.post(url, headers=headers, json=data, timeout=8)
|
||||
if resp.status_code == 200:
|
||||
valid_api_keys.append(key)
|
||||
else:
|
||||
logging.warning(f"API密钥无效(被移除): {key}, 状态码: {resp.status_code}, 响应: {resp.text}")
|
||||
except Exception as e:
|
||||
logging.warning(f"API密钥验证异常(被移除): {key}, 错误: {e}")
|
||||
return valid_api_keys
|
||||
|
||||
@classmethod
|
||||
def count(cls) -> int:
|
||||
"""
|
||||
静态方法:获取API密钥数量
|
||||
|
||||
Returns:
|
||||
API密钥数量
|
||||
"""
|
||||
instance = cls.get_instance()
|
||||
return len(instance.api_keys)
|
||||
|
||||
def __init__(self, env_var_name: str = "OPENAI_API_KEY", separator: str = ";"):
|
||||
"""
|
||||
初始化API密钥管理器
|
||||
|
||||
Args:
|
||||
env_var_name: 环境变量名称,默认为'OPENAI_API_KEY'
|
||||
separator: 密钥分隔符,默认为分号
|
||||
"""
|
||||
self.env_var_name = env_var_name
|
||||
self.separator = separator
|
||||
self.api_keys = self._load_api_keys()
|
||||
|
||||
# 初始化密钥使用统计
|
||||
for key in self.api_keys:
|
||||
if key not in self._key_usage:
|
||||
self._key_usage[key] = {
|
||||
"count": 0,
|
||||
"last_used": 0
|
||||
}
|
||||
|
||||
def _load_api_keys(self) -> List[str]:
|
||||
"""
|
||||
从环境变量加载API密钥
|
||||
|
||||
Returns:
|
||||
API密钥列表
|
||||
"""
|
||||
# api_keys = []
|
||||
# env_value = os.environ.get(self.env_var_name)
|
||||
|
||||
# if env_value:
|
||||
# # 分割环境变量并移除空白字符
|
||||
# keys = [key.strip() for key in env_value.split(self.separator)]
|
||||
# # 过滤掉空字符串
|
||||
# api_keys = [key for key in keys if key]
|
||||
|
||||
# return api_keys
|
||||
return API_KEY_LIST
|
||||
|
||||
def _get_next_api_key(self) -> Optional[str]:
|
||||
"""
|
||||
获取下一个API密钥,使用轮转策略
|
||||
|
||||
Returns:
|
||||
API密钥,如果没有可用的密钥则返回None
|
||||
"""
|
||||
if not self.api_keys:
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
# 轮转到下一个密钥
|
||||
self._current_index = (self._current_index + 1) % len(self.api_keys)
|
||||
selected_key = self.api_keys[self._current_index]
|
||||
|
||||
# 更新使用统计
|
||||
self._key_usage[selected_key]["count"] += 1
|
||||
self._key_usage[selected_key]["last_used"] = time.time()
|
||||
|
||||
return selected_key
|
||||
|
||||
def _get_random_api_key(self) -> Optional[str]:
|
||||
"""
|
||||
随机获取一个API密钥
|
||||
|
||||
Returns:
|
||||
API密钥,如果没有可用的密钥则返回None
|
||||
"""
|
||||
if not self.api_keys:
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
selected_key = random.choice(self.api_keys)
|
||||
|
||||
# 更新使用统计
|
||||
self._key_usage[selected_key]["count"] += 1
|
||||
self._key_usage[selected_key]["last_used"] = time.time()
|
||||
|
||||
return selected_key
|
||||
|
||||
def get_all_api_keys(self) -> List[str]:
|
||||
"""
|
||||
获取所有API密钥
|
||||
|
||||
Returns:
|
||||
API密钥列表
|
||||
"""
|
||||
return self.api_keys.copy()
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
"""
|
||||
检查是否有可用的API密钥
|
||||
|
||||
Returns:
|
||||
如果有可用的API密钥则返回True,否则返回False
|
||||
"""
|
||||
return len(self.api_keys) > 0
|
||||
|
||||
def get_usage_stats(self) -> Dict:
|
||||
"""
|
||||
获取密钥使用统计信息
|
||||
|
||||
Returns:
|
||||
密钥使用统计信息
|
||||
"""
|
||||
return self._key_usage.copy()
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
|
||||
# 获取有效的API密钥列表
|
||||
valid_keys = APIKeyManager.get_valid_api_keys()
|
||||
print(f"有效的API密钥列表:\n" + "\n".join(valid_keys))
|
||||
|
||||
# 查看总密钥数
|
||||
print(f"总共有 {APIKeyManager.count()} 个API密钥")
|
||||
|
||||
# 获取实例并查看使用统计
|
||||
instance = APIKeyManager.get_instance()
|
||||
stats = instance.get_usage_stats()
|
||||
for key, data in stats.items():
|
||||
print(f"密钥 {key[:5]}... 使用次数: {data['count']}")
|
||||
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
File: ModelTool.py
|
||||
Date: 2025-05-15
|
||||
Author: oyyz
|
||||
Description: 模型工具类
|
||||
"""
|
||||
|
||||
from openai import OpenAI
|
||||
import httpx
|
||||
import time
|
||||
import logging # 导入 logging 模块
|
||||
from langchain.embeddings.base import Embeddings
|
||||
from typing import List, Any
|
||||
import requests
|
||||
import os
|
||||
import logging
|
||||
from .APIKeyManager import APIKeyManager
|
||||
|
||||
class SiliconFlowEmbeddings(Embeddings):
|
||||
"""SiliconFlow嵌入模型封装"""
|
||||
def __init__(self, api_key: str, model: str = "bge-m3"):
|
||||
self.api_key = api_key
|
||||
self.model = model
|
||||
self.url = "http://10.1.16.39:9995/v1/embeddings"
|
||||
self.headers = {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
def _embed(self, input: List[str]) -> List[List[float]]:
|
||||
payload = {
|
||||
"model": self.model,
|
||||
"input": input,
|
||||
"encoding_format": "float"
|
||||
}
|
||||
response = requests.post(self.url, json=payload, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
return [item["embedding"] for item in data["data"]]
|
||||
|
||||
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
||||
return self._embed(texts)
|
||||
|
||||
def embed_query(self, text: str) -> List[float]:
|
||||
return self._embed([text])[0]
|
||||
|
||||
class XinferenceReRankerModel:
|
||||
"""重排模型封装"""
|
||||
|
||||
@staticmethod
|
||||
def rerank(query: str, documents: List[str], top_k: int = 10) -> List[str]:
|
||||
"""
|
||||
使用重排序模型对文档进行重新排序
|
||||
|
||||
Args:
|
||||
query: 用户查询文本
|
||||
documents: 需要重新排序的文档列表
|
||||
top_k: 返回排序后的前k个文档
|
||||
|
||||
Returns:
|
||||
List[dict]: 重排序后的文档列表,每个元素包含document内容、相关性分数和原始索引
|
||||
"""
|
||||
url = "http://10.1.16.39:9995/v1/rerank"
|
||||
|
||||
|
||||
params = {"documents": documents, "query": query, "top_n": top_k, "return_documents": True, "model": os.getenv("RERANKER_MODEL_NAME")}
|
||||
headers = {
|
||||
"Authorization": "Bearer <token>", # 这里需要替换为实际的token
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=params, headers=headers)
|
||||
response.raise_for_status() # 检查响应状态
|
||||
results = response.json()
|
||||
|
||||
# 返回重排序后的文档列表
|
||||
return [{"document": item["document"]["text"], "score": item["relevance_score"], "index": item["index"]} for item in results["results"]]
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"重排序请求失败: {str(e)}")
|
||||
return []
|
||||
|
||||
class OpenAiLLM:
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if kwargs.get("api_key") == None or kwargs.get("base_url") == None or kwargs.get("model") == None:
|
||||
raise ValueError("api_key, base_url, model 不能为空")
|
||||
|
||||
self._api_key = kwargs.get("api_key")
|
||||
self._url = kwargs.get("base_url")
|
||||
self._model = kwargs.get("model")
|
||||
|
||||
kwargs.pop("api_key")
|
||||
kwargs.pop("base_url")
|
||||
kwargs.pop("model")
|
||||
self._kwargs = kwargs
|
||||
|
||||
def invoke(self, user_prompt="你是谁?", need_retry=True):
|
||||
# 初始化 OpenAI 客户端
|
||||
api_key = APIKeyManager.get_api_key()
|
||||
client = OpenAI(api_key=api_key, base_url=self._url)
|
||||
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
|
||||
if need_retry:
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
# 创建 Completion 请求. 超时120s
|
||||
completion = client.chat.completions.create(
|
||||
model=self._model,
|
||||
messages=[{'role': 'user', 'content': user_prompt}],
|
||||
timeout=httpx.Timeout(300.0),
|
||||
**self._kwargs
|
||||
)
|
||||
return completion.choices[0].message
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count == max_retries:
|
||||
logging.error(f"LLM 重试{max_retries}次后仍然失败: {e}")
|
||||
return ""
|
||||
else:
|
||||
time.sleep(5*retry_count) # 重试前等待1秒
|
||||
else:
|
||||
# 创建 Completion 请求. 超时120s
|
||||
completion = client.chat.completions.create(
|
||||
model=self._model,
|
||||
messages=[{'role': 'user', 'content': user_prompt}],
|
||||
timeout=httpx.Timeout(300.0),
|
||||
**self._kwargs
|
||||
)
|
||||
return completion.choices[0].message
|
||||
|
||||
if __name__ == "__main__":
|
||||
reranker = XinferenceReRankerModel()
|
||||
query = "什么是AI"
|
||||
documents = ["AI是人工智能", "AI是机器学习", "AI是深度学习"]
|
||||
results = reranker.rerank(query, documents)
|
||||
print(results)
|
||||
@@ -0,0 +1,159 @@
|
||||
import os.path
|
||||
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class WikijsTool:
|
||||
BASE_URL = "http://10.1.16.39:8090/graphql"
|
||||
HEADERS = {
|
||||
"Authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGkiOjcsImdycCI6MSwiaWF"
|
||||
"0IjoxNzIzMDIwNzg4LCJleHAiOjE4MTc2OTM1ODgsImF1ZCI6InVybjp3aWtpLmpzIiwiaX"
|
||||
"NzIjoidXJuOndpa2kuanMifQ.NSfE4tB7tkN8yapAs0CgkR-Yll6wc3gO3QGKMAv-TlGxx6A-9fJRmkwhRDTVMj_yPVG6"
|
||||
"NXVy_AZpJtLapRXFGn0cvscsRJxq3fY1KgEyt8wO99jvd8DpNHpHhAIgrtyDelmHsBD2Wb5Ib3WJFsWC6d8Yhm9dkpx6tZ"
|
||||
"vMAlFIKOg6UodMoMIry3YWiPGLaqJPQ0gcKmcnB2tC7sPXIIZnvfb5912GVM0n-4wvWobQnb_tXQuYZf99wH_leXjC_7BK8"
|
||||
"8JSaAmB980i3rBxfejmaJ8E6D48zRxwwPFa0veVjjzRkVqHPwAjl1CXb2HE29pGtNmSEE1kLQVqOZD_ibOwKQ"
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def init_url():
|
||||
# 获取当前文件的路径
|
||||
file_path = Path(__file__).resolve()
|
||||
file_path = os.path.join(file_path.parent, 'wikiconfig.json')
|
||||
if not os.path.exists(file_path):
|
||||
return False
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
|
||||
if 'url' in data:
|
||||
WikijsTool.BASE_URL = data['url']
|
||||
|
||||
if 'Authorization' in data:
|
||||
WikijsTool.HEADERS['Authorization'] = data['Authorization']
|
||||
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def get_all_documents() -> list[dict]:
|
||||
query = """
|
||||
query Pages {
|
||||
pages {
|
||||
list {
|
||||
path
|
||||
locale
|
||||
title
|
||||
contentType
|
||||
id
|
||||
isPublished
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
# 构建请求数据
|
||||
data = {
|
||||
'query': query,
|
||||
}
|
||||
|
||||
# 发送 POST 请求
|
||||
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
|
||||
if response.status_code == 200:
|
||||
# 解析数据
|
||||
list_info = json.loads(response.content)['data']['pages']['list']
|
||||
return [item for item in list_info]
|
||||
else:
|
||||
raise ValueError(f"获取文档列表失败,原因:“{response.text}")
|
||||
|
||||
@staticmethod
|
||||
def get_all_doc_by_path(path: str, path_is_dir: bool = True) -> list[dict]:
|
||||
list_document = WikijsTool.get_all_documents()
|
||||
all_document_list = []
|
||||
if path_is_dir:
|
||||
temp_path = path + '/'
|
||||
else:
|
||||
temp_path = path
|
||||
for document_info in list_document:
|
||||
document_path = str(document_info["path"])
|
||||
# 根据路径过滤出对应的所有文档
|
||||
if not document_path.startswith(temp_path):
|
||||
continue
|
||||
|
||||
all_document_list.append(document_info)
|
||||
|
||||
return all_document_list
|
||||
|
||||
@staticmethod
|
||||
def search_document(query_str: str) -> list[dict]:
|
||||
graphql_query = f"""
|
||||
query Pages {{
|
||||
pages {{
|
||||
search(query: "{query_str}") {{
|
||||
results {{
|
||||
id
|
||||
path
|
||||
locale
|
||||
title
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
# 构建请求数据
|
||||
data = {
|
||||
'query': graphql_query,
|
||||
}
|
||||
|
||||
# 发送 POST 请求
|
||||
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
|
||||
if response.status_code == 200:
|
||||
# 解析数据
|
||||
search_results = json.loads(response.content)['data']['pages']['search']['results']
|
||||
return search_results
|
||||
else:
|
||||
raise ValueError(f"查询文档失败,原因:“{response.text}")
|
||||
|
||||
@staticmethod
|
||||
def query_doc_info(doc_id: int) -> dict:
|
||||
query = """
|
||||
query singlePages($doc_id: Int!) {
|
||||
pages {
|
||||
single(id: $doc_id) {
|
||||
id
|
||||
path
|
||||
title
|
||||
isPublished
|
||||
content
|
||||
contentType
|
||||
isPrivate
|
||||
updatedAt
|
||||
createdAt
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
# 构建请求数据
|
||||
variables = {
|
||||
'doc_id': doc_id,
|
||||
}
|
||||
data = {
|
||||
'query': query,
|
||||
'variables': variables
|
||||
}
|
||||
|
||||
# 发送 POST 请求
|
||||
response = requests.post(WikijsTool.BASE_URL, headers=WikijsTool.HEADERS, json=data)
|
||||
if "errors" in response.text:
|
||||
result = json.loads(response.content)['errors'][0]['message']
|
||||
return {}
|
||||
else:
|
||||
return json.loads(response.content)['data']['pages']['single']
|
||||
|
||||
|
||||
WikijsTool.init_url()
|
||||
if __name__ == "__main__":
|
||||
WikijsTool.query_doc_info(6448)
|
||||
print(WikijsTool.rename_directory("配网知识库/配网造价软件", "配网知识库/配网造价软件1"))
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,3 @@
|
||||
from . import custom_markdownify
|
||||
|
||||
convert_html_to_md = custom_markdownify.md
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,491 @@
|
||||
import re
|
||||
from textwrap import fill
|
||||
|
||||
import requests
|
||||
from bs4 import NavigableString
|
||||
from bs4 import BeautifulSoup
|
||||
from markdownify import MarkdownConverter, chomp, UNDERLINED, ATX_CLOSED
|
||||
import copy
|
||||
from . import picture_process
|
||||
|
||||
|
||||
# <br>是否是单元格内部的换行符
|
||||
def judge_br_in_table(el):
|
||||
if el.name in ['td', 'tr']:
|
||||
return True
|
||||
if el.parent is None:
|
||||
return False
|
||||
# 递归父级元素
|
||||
return judge_br_in_table(el.parent)
|
||||
|
||||
|
||||
# 获取div标签中是否为标题,如果是标题则markdown中的返回标题等级
|
||||
def get_markdown_title_level(el):
|
||||
if el.name != 'div' or 'class' not in el.attrs:
|
||||
return ''
|
||||
title_level = ''
|
||||
if 'hdwiki_tmml' in el.attrs['class']:
|
||||
title_level = '## '
|
||||
elif 'hdwiki_tmmll' in el.attrs['class']:
|
||||
title_level = '### '
|
||||
return title_level
|
||||
|
||||
|
||||
def str_is_title(text) -> bool:
|
||||
text = text.strip()
|
||||
pattern = r'^#+'
|
||||
|
||||
# 使用re.search匹配字符串开头的 # 符号
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
# 判断el 是否是图片的DIV标签
|
||||
def is_img_div_tag(el) -> bool:
|
||||
if el is None:
|
||||
return False
|
||||
if el.name != "div":
|
||||
return False
|
||||
class_attr = el.get('class')
|
||||
if class_attr is None:
|
||||
return False
|
||||
if "img" in class_attr or "img_l" in class_attr:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
# 判断div内部是否是纯文本内容,并且display是否为block
|
||||
def is_only_text_div(el) -> bool:
|
||||
if el is None or el.name != "div" or el.text == "":
|
||||
return False
|
||||
|
||||
if el.get("display", "block") != "block":
|
||||
return False
|
||||
|
||||
# div标签下只包含文本
|
||||
if isinstance(el.string, NavigableString):
|
||||
return True
|
||||
|
||||
# 兼容<div><b>1. 版本概述</b> </div> 判断错误问题
|
||||
# 递归获取所有子标签
|
||||
child_tags = el.find_all(recursive=True)
|
||||
for tag in child_tags:
|
||||
if tag.text == "":
|
||||
continue
|
||||
if tag.name in ["table", "td", "img"]:
|
||||
return False
|
||||
if isinstance(tag.string, NavigableString):
|
||||
continue
|
||||
else:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# a标签是否在图片的div标签内部
|
||||
def a_tag_is_in_img(el) -> bool:
|
||||
if el.parent is None:
|
||||
return False
|
||||
if el.name != "a" or el.parent.name != "div":
|
||||
return False
|
||||
|
||||
return is_img_div_tag(el.parent)
|
||||
|
||||
|
||||
class CustomMarkDownConverter(MarkdownConverter):
|
||||
"""
|
||||
创建自定义的换行装换函数
|
||||
"""
|
||||
|
||||
def __init__(self, img_download_path, **options):
|
||||
super().__init__(**options)
|
||||
self.img_download_path = img_download_path
|
||||
|
||||
# 单元格内的换行依旧保持<br>格式
|
||||
def convert_br(self, el, text, convert_as_inline):
|
||||
if judge_br_in_table(el):
|
||||
return "<br/>"
|
||||
|
||||
# 容错处理(文章4696),因bs4解析html错误 导致将 分类图标签 解析到了br标签下导致图片丢失
|
||||
if text.strip():
|
||||
return text + "\n"
|
||||
|
||||
return super().convert_br(el, text, convert_as_inline)
|
||||
|
||||
# 图片div标签 在图片与图片描述之间添加换行
|
||||
@staticmethod
|
||||
def convert_img_div(text):
|
||||
pattern = r'\*\*(.*?)\*\*'
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
start_index = match.start()
|
||||
text = text[:start_index] + "\n" + text[start_index:]
|
||||
return text
|
||||
|
||||
# 装换标题格式
|
||||
def convert_div(self, el, text, convert_as_inline):
|
||||
title_level = get_markdown_title_level(el)
|
||||
if title_level != '':
|
||||
return "\n\n" + title_level + text + '\n\n'
|
||||
|
||||
if is_img_div_tag(el):
|
||||
# 图片与图片描述文字之间掺入换行符
|
||||
return self.convert_img_div(text)
|
||||
|
||||
if is_only_text_div(el):
|
||||
text = "\n\n" + text + "\n\n"
|
||||
|
||||
return text
|
||||
|
||||
# 检查 URL 是否有效的函数
|
||||
@staticmethod
|
||||
def is_valid_url(url):
|
||||
try:
|
||||
response = requests.head(url, allow_redirects=True)
|
||||
return response.status_code == 200
|
||||
except requests.RequestException:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def try_complete_img_description(img_el):
|
||||
if img_el is None or img_el.name != "img":
|
||||
return
|
||||
|
||||
# 找到父级的div标签
|
||||
img_el_parent_div = None
|
||||
cur_el = img_el
|
||||
while cur_el.parent is not None:
|
||||
if is_img_div_tag(cur_el.parent):
|
||||
img_el_parent_div = cur_el.parent
|
||||
break
|
||||
cur_el = cur_el.parent
|
||||
|
||||
if img_el_parent_div is not None and len(img_el_parent_div.text) != 0:
|
||||
img_el.attrs["alt"] = img_el_parent_div.text
|
||||
return
|
||||
|
||||
# 找到父级的figure标签
|
||||
img_el_parent_div = None
|
||||
cur_el = img_el
|
||||
while cur_el.parent is not None:
|
||||
if cur_el.parent is not None and cur_el.parent.name == 'figure':
|
||||
img_el_parent_div = cur_el.parent
|
||||
break
|
||||
cur_el = cur_el.parent
|
||||
|
||||
if img_el_parent_div is not None and len(img_el_parent_div.text) != 0:
|
||||
img_el.attrs["alt"] = img_el_parent_div.text
|
||||
return
|
||||
|
||||
|
||||
def convert_figcaption(self, el, text, convert_as_inline):
|
||||
return ""
|
||||
|
||||
# 图片后添加空行,图片应该单独在一行后面不接文字(示例文章:6925)
|
||||
def convert_img(self, el, text, convert_as_inline):
|
||||
self.try_complete_img_description(el)
|
||||
img_text = super().convert_img(el, text, convert_as_inline)
|
||||
|
||||
# 5195 出现img标签内出现换行导致 markdown图片显示出现问题
|
||||
img_text = img_text.replace("\r\n", "")
|
||||
img_text = img_text.replace("\n", "")
|
||||
# 空的img标签直接返回空行
|
||||
if img_text == "![]()":
|
||||
return '\n\n'
|
||||
|
||||
# img 标签使用父级超链接标签中的中大图
|
||||
src = el.attrs.get('src', None) or ''
|
||||
if el.parent is not None and el.parent.name == "a":
|
||||
href = el.parent.attrs.get('href', None) or ''
|
||||
href_path = href.rsplit(".", 1)[0]
|
||||
src_path = src.rsplit(".", 1)[0]
|
||||
if href_path + "_s" == src_path:
|
||||
img_text = img_text.replace(src, href)
|
||||
|
||||
if '_s' in img_text:
|
||||
src_path = src.rsplit(".", 1)[0]
|
||||
if src_path.endswith('_s'):
|
||||
original_src_path = src_path[:-2] # 去掉末尾的 '_s'
|
||||
# 构建原始 URL
|
||||
original_url = original_src_path + "." + src.split(".")[-1]
|
||||
if self.is_valid_url(original_url):
|
||||
img_text = img_text.replace(src, original_url)
|
||||
|
||||
# 转换并下载图片
|
||||
return picture_process.process_img_tag(img_text, self.img_download_path)
|
||||
|
||||
@staticmethod
|
||||
def is_img_describe_strong(el) -> bool:
|
||||
if el is None or el.parent is None:
|
||||
return False
|
||||
|
||||
if len(el.contents) == 0:
|
||||
return False
|
||||
|
||||
# if not isinstance(el.contents[0], NavigableString):
|
||||
# return False
|
||||
|
||||
img_list = el.parent.findAll("img")
|
||||
if len(img_list) == 0:
|
||||
return False
|
||||
|
||||
for img_tag in img_list:
|
||||
alt = img_tag.get("alt", None)
|
||||
title = img_tag.get("title", None)
|
||||
if alt is None and title is None:
|
||||
continue
|
||||
|
||||
if alt == el.text or title == el.text:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def convert_b(self, el, text, convert_as_inline):
|
||||
# 如果b 标签下只存在一个标题,则该b不做任何处理,避免对标题进行加粗(示例文章:6925)
|
||||
if len(el.contents) == 1:
|
||||
title_level = get_markdown_title_level(el.contents[0])
|
||||
if title_level != '':
|
||||
return text
|
||||
|
||||
# <b> 标签中存在标题时,不在对内容进行加粗
|
||||
if str_is_title(text):
|
||||
return text
|
||||
|
||||
if self.is_img_describe_strong(el):
|
||||
return ""
|
||||
|
||||
text = text.strip(" \t")
|
||||
suffix = ""
|
||||
if text.endswith("\n"):
|
||||
suffix = " \n"
|
||||
b_text = super().convert_b(el, text, convert_as_inline)
|
||||
|
||||
# 解析完<b> 标签后添加空格。避免出现markdown文档中出现《**1.****版本概述**》(文章2377 4292等)
|
||||
return " " + b_text + suffix + " "
|
||||
|
||||
convert_strong = convert_b
|
||||
|
||||
# 有可能出现<p>之后紧接一个标题hdwiki_tmml 故前后添加换行
|
||||
def convert_p(self, el, text, convert_as_inline):
|
||||
if convert_as_inline:
|
||||
return text
|
||||
if self.options['wrap']:
|
||||
text = fill(text,
|
||||
width=self.options['wrap_width'],
|
||||
break_long_words=False,
|
||||
break_on_hyphens=False)
|
||||
# <p>标签前后换行
|
||||
return '\n\n%s\n\n' % text if text else ''
|
||||
|
||||
def convert_a(self, el, text, convert_as_inline):
|
||||
prefix, suffix, text = chomp(text)
|
||||
if not text:
|
||||
return ''
|
||||
href = el.get('href')
|
||||
if self.is_href_img(href):
|
||||
return text
|
||||
title = el.get('title')
|
||||
# 5195 出现img标签内出现换行导致 markdown图片显示出现问题
|
||||
if title is not None:
|
||||
title = title.replace("\n", "")
|
||||
# For the replacement see #29: text nodes underscores are escaped
|
||||
if (self.options['autolinks']
|
||||
and text.replace(r'\_', '_') == href
|
||||
and not title
|
||||
and not self.options['default_title']):
|
||||
# Shortcut syntax
|
||||
return '<%s>' % href
|
||||
if self.options['default_title'] and not title:
|
||||
title = href
|
||||
title_part = ' "%s"' % title.replace('"', r'\"') if title else ''
|
||||
|
||||
a_tag = '%s[%s](%s%s)%s' % (prefix, text, href, title_part, suffix) if href else text
|
||||
return a_tag
|
||||
|
||||
@staticmethod
|
||||
def is_href_img(href_url) -> bool:
|
||||
if href_url is None:
|
||||
return False
|
||||
file_extension = href_url.split(".")[-1]
|
||||
# 不是图片不处理
|
||||
file_extension = file_extension.lower()
|
||||
if file_extension not in ["jpg", "jpeg", "png", "gif"]:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def convert_li(self, el, text, convert_as_inline):
|
||||
# 为空的li标签返回空(文章 4347)
|
||||
if not text.strip():
|
||||
return ""
|
||||
|
||||
li_text = super().convert_li(el, text, convert_as_inline)
|
||||
return li_text
|
||||
|
||||
def convert_td(self, el, text, convert_as_inline):
|
||||
if "\r\n" in text:
|
||||
text = text.replace("\r\n", "<br>")
|
||||
|
||||
if "\n" in text:
|
||||
text = text.replace("\n", "<br>")
|
||||
|
||||
return ' ' + text + ' |'
|
||||
|
||||
def convert_hn(self, n, el, text, convert_as_inline):
|
||||
if convert_as_inline:
|
||||
return text
|
||||
|
||||
style = self.options['heading_style'].lower()
|
||||
text = text.rstrip()
|
||||
if style == UNDERLINED and n <= 2:
|
||||
line = '=' if n == 1 else '-'
|
||||
return self.underline(text, line)
|
||||
hashes = '#' * n
|
||||
hashes = hashes + " "
|
||||
if style == ATX_CLOSED:
|
||||
return '\n\n %s %s %s\n\n' % (hashes, text, hashes)
|
||||
return '\n\n%s %s\n\n' % (hashes, text)
|
||||
|
||||
@staticmethod
|
||||
def convert_thead_table(el, text, cell_name, convert_as_inline):
|
||||
cells = el.find_all(['td', 'th'])
|
||||
is_headrow = all([cell.name == cell_name for cell in cells])
|
||||
overline = ''
|
||||
underline = ''
|
||||
if is_headrow and not el.previous_sibling:
|
||||
# first row and is headline: print headline underline
|
||||
underline += '| ' + ' | '.join(['---'] * len(cells)) + ' |' + '\n'
|
||||
elif (not el.previous_sibling
|
||||
and (el.parent.name == 'table'
|
||||
or (el.parent.name == 'tbody'
|
||||
and not el.parent.previous_sibling))):
|
||||
# first row, not headline, and:
|
||||
# - the parent is table or
|
||||
# - the parent is tbody at the beginning of a table.
|
||||
# print empty headline above this row
|
||||
overline += '| ' + ' | '.join([''] * len(cells)) + ' |' + '\n'
|
||||
overline += '| ' + ' | '.join(['---'] * len(cells)) + ' |' + '\n'
|
||||
return overline + '|' + text + '\n' + underline
|
||||
|
||||
def convert_tr(self, el, text, convert_as_inline):
|
||||
# 解决table标签下存在thead的问题 (文章4061 1976)
|
||||
if el and el.parent and el.parent.name == "thead":
|
||||
return CustomMarkDownConverter.convert_thead_table(el, text, 'td', convert_as_inline)
|
||||
|
||||
# 兼容 table->colgroup、tbody->tr 文章4364
|
||||
if (el and el.parent and el.parent.previousSibling
|
||||
and el.parent.name == "tbody"
|
||||
and el.parent.previousSibling.name == "colgroup"):
|
||||
return CustomMarkDownConverter.convert_thead_table(el, text, 'td', convert_as_inline)
|
||||
|
||||
return super().convert_tr(el, text, convert_as_inline)
|
||||
|
||||
def convert_pre(self, el, text, convert_as_inline):
|
||||
# 文章5192出现pre标签,但内容不是代码。故不额外处理pre标签
|
||||
return text
|
||||
|
||||
def escape(self, text):
|
||||
if not text:
|
||||
return ''
|
||||
if self.options['escape_misc']:
|
||||
# text = re.sub(r'([\\&<`[>~#=+|-])', r'\\\1', text)
|
||||
text = re.sub(r'([\\&<`[>~#%=+|-])', r'\\\1', text)
|
||||
# 以下的转义是不必要的
|
||||
# text = re.sub(r'([0-9])([.)])', r'\1\\\2', text)
|
||||
if self.options['escape_asterisks']:
|
||||
text = text.replace('*', r'\*')
|
||||
if self.options['escape_underscores']:
|
||||
text = text.replace('_', r'\_')
|
||||
return text
|
||||
|
||||
@staticmethod
|
||||
def convert_span(el, text, convert_as_inline):
|
||||
# 文章3526出现图片后面紧接图片文本的问题。图片文本在span标签内
|
||||
if "style" not in el.attrs:
|
||||
return text
|
||||
|
||||
style_attr = el.attrs['style']
|
||||
|
||||
if style_attr is None:
|
||||
return text
|
||||
style_content = style_attr.split(';')
|
||||
# 遍历style属性内容,找到display的值
|
||||
for item in style_content:
|
||||
if 'display' in item:
|
||||
display_value = item.split(': ')[1] # 获取冒号后的值
|
||||
if display_value == "block" and text != "":
|
||||
return f"\n\n{text}\n\n"
|
||||
return text
|
||||
|
||||
|
||||
def expand_html_table(html) -> tuple[str, bool]:
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
tables = soup.find_all('table')
|
||||
if len(tables) == 0:
|
||||
return html, False
|
||||
for table in tables:
|
||||
# 创建一个二维列表来表示表格
|
||||
table_rows = table.find_all('tr')
|
||||
max_cols = 0
|
||||
for row in table_rows:
|
||||
cols = row.find_all(['td', 'th'])
|
||||
col_count = sum([int(col.get('colspan', 1)) for col in cols])
|
||||
if col_count > max_cols:
|
||||
max_cols = col_count
|
||||
|
||||
# 初始化一个二维列表来存储最终的表格
|
||||
result_table = []
|
||||
for _ in range(len(table_rows)):
|
||||
result_table.append([None] * max_cols)
|
||||
|
||||
# 填充二维列表
|
||||
for r, row in enumerate(table_rows):
|
||||
cols = row.find_all(['td', 'th'])
|
||||
c = 0
|
||||
for col in cols:
|
||||
while result_table[r][c] is not None:
|
||||
c += 1
|
||||
colspan = int(col.get('colspan', 1))
|
||||
rowspan = int(col.get('rowspan', 1))
|
||||
for i in range(rowspan):
|
||||
for j in range(colspan):
|
||||
# 拆分合并单元格时,重复内容
|
||||
result_table[r + i][c + j] = copy.copy(col)
|
||||
# if j == 0 and i == 0:
|
||||
# result_table[r + i][c + j] = copy.copy(col)
|
||||
# else:
|
||||
# result_table[r + i][c + j] = soup.new_tag('td')
|
||||
c += colspan
|
||||
|
||||
# 生成新的表格 HTML
|
||||
new_table = soup.new_tag('table', border="1", cellspacing="0")
|
||||
tbody = soup.new_tag('tbody')
|
||||
new_table.append(tbody)
|
||||
for row in result_table:
|
||||
tr = soup.new_tag('tr')
|
||||
for col in row:
|
||||
if col is not None:
|
||||
td = soup.new_tag(col.name)
|
||||
td.string = col.get_text()
|
||||
tr.append(td)
|
||||
tbody.append(tr)
|
||||
|
||||
# 替换原始HTML中的旧表格
|
||||
table.replace_with(new_table)
|
||||
|
||||
return str(soup), True
|
||||
|
||||
|
||||
# Create shorthand method for conversion
|
||||
def md(html, img_download_path, **options):
|
||||
new_html, result = expand_html_table(html)
|
||||
markdown_content = CustomMarkDownConverter(img_download_path, **options).convert(new_html)
|
||||
# 删除换行符中间的空格
|
||||
temp_txt = re.sub(r'\n\s*\n', '\n\n', markdown_content)
|
||||
# 连续超过3个以上的换行符替换为3个
|
||||
temp_txt = re.sub(r'\n{3,}', '\n\n\n', temp_txt)
|
||||
return temp_txt
|
||||
@@ -0,0 +1,170 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from urllib.parse import urljoin
|
||||
import requests
|
||||
|
||||
|
||||
def get_img_tag_url(img_tag):
|
||||
|
||||
# 提取图片url的正则表达式模式
|
||||
pattern = r'\!\[.*?\]\((.*?)\)'
|
||||
# 找到第一个匹配的链接
|
||||
match = re.search(pattern, img_tag)
|
||||
if not match:
|
||||
return ""
|
||||
|
||||
# 获取匹配到的链接
|
||||
link = match.group(1)
|
||||
# 第0个为链接
|
||||
link = link.split(" ")[0]
|
||||
return link
|
||||
|
||||
|
||||
# 填充img标签中的图片链接
|
||||
# img_tag ''
|
||||
# img_tag ''
|
||||
def fill_img_url(img_tag):
|
||||
"""
|
||||
填充img标签中的图片链接。
|
||||
|
||||
参数:
|
||||
img_tag (str): 原始的img标签
|
||||
|
||||
返回:
|
||||
tuple: 修改后的img标签和图片的完整链接
|
||||
"""
|
||||
# 一个完整的img标签内删除换行符
|
||||
img_tag = img_tag.replace("\n", "")
|
||||
link = get_img_tag_url(img_tag)
|
||||
if len(link) == 0:
|
||||
return img_tag, ''
|
||||
|
||||
base_url = os.getenv("IMG_URL_PREFIX")
|
||||
if "http:" in link:
|
||||
# 图片为全链接,不替换
|
||||
return img_tag, link
|
||||
elif base_url:
|
||||
# 补全图片链接
|
||||
full_link = urljoin(base_url, link)
|
||||
img_tag = img_tag.replace(link, full_link)
|
||||
return img_tag, full_link
|
||||
else:
|
||||
return img_tag, ''
|
||||
|
||||
|
||||
def download_picture(img_tag, download_path):
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
||||
'Chrome/94.0.4606.71 Safari/537.36 '
|
||||
}
|
||||
img_tag, img_url = fill_img_url(img_tag)
|
||||
if img_url == '':
|
||||
return img_tag
|
||||
# if "_s" in img_tag:
|
||||
# breakpoint()
|
||||
file_name = img_url.split("/")[-1]
|
||||
file_path = os.path.normpath(download_path + "\\" + file_name)
|
||||
file_path = file_path.replace("\\", "/")
|
||||
|
||||
# 文件已经存在时不下载
|
||||
if not os.path.exists(file_path):
|
||||
img_date = requests.get(url=img_url, headers=headers).content
|
||||
logging.info(f"图片下载成功:{img_url}")
|
||||
with open(file_path, 'wb') as fp:
|
||||
fp.write(img_date)
|
||||
|
||||
# img_tag中的url替换为下载的图片路径
|
||||
return img_tag.replace(img_url, file_path)
|
||||
|
||||
|
||||
def download_picture_from_other_url(img_tag, download_path):
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
||||
'Chrome/94.0.4606.71 Safari/537.36 '
|
||||
}
|
||||
img_tag, img_url = fill_img_url(img_tag)
|
||||
# if "_s" in img_tag:
|
||||
# breakpoint()
|
||||
file_name = uuid.uuid4()
|
||||
file_path = os.path.join(download_path, f"{file_name}.png")
|
||||
file_path = os.path.normpath(file_path)
|
||||
# 文件已经存在时不下载
|
||||
if not os.path.exists(file_path):
|
||||
try:
|
||||
img_date = requests.get(url=img_url, headers=headers).content
|
||||
with open(file_path, 'wb') as fp:
|
||||
fp.write(img_date)
|
||||
logging.info(f"图片下载成功:{img_url}")
|
||||
except Exception as e:
|
||||
logging.warning(f"img download error url:{img_url}")
|
||||
return img_tag
|
||||
|
||||
# img_tag中的url替换为下载的图片路径
|
||||
return img_tag.replace(img_url, file_path)
|
||||
|
||||
|
||||
def extract_base64_from_data_uri(data_uri):
|
||||
# 分割字符串以找到 base64 部分
|
||||
parts = data_uri.split(',')
|
||||
if len(parts) == 2 and parts[0].endswith('base64'):
|
||||
# 移除后缀并返回 base64 值
|
||||
return parts[1][:-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def picture_base64(img_tag, picture_save_path):
|
||||
# 解码Base64字符串
|
||||
# 
|
||||
base64_str = extract_base64_from_data_uri(img_tag)
|
||||
if picture_save_path is None or picture_save_path == "":
|
||||
return ""
|
||||
# 将图片内容做MD5 用作文件名
|
||||
hash_object = hashlib.md5()
|
||||
hash_object.update(base64_str.encode())
|
||||
img_md5 = hash_object.hexdigest()
|
||||
|
||||
picture_save_path = picture_save_path + "\\%s.png" % img_md5
|
||||
picture_save_path = os.path.normpath(picture_save_path)
|
||||
picture_save_path = picture_save_path.replace("\\", "/")
|
||||
|
||||
# 文件已经存在时不重新保存
|
||||
if not os.path.exists(picture_save_path):
|
||||
decoded_string = base64.b64decode(base64_str)
|
||||
with open(picture_save_path, 'wb') as fp:
|
||||
fp.write(decoded_string)
|
||||
|
||||
# 修改img_tab的图片路径
|
||||
match = re.search("\[(.*?)\]", img_tag)
|
||||
result = ""
|
||||
if match:
|
||||
result = match.group(1)
|
||||
if result == "":
|
||||
return "" % picture_save_path
|
||||
else:
|
||||
return "" % (result, picture_save_path, result)
|
||||
|
||||
|
||||
def process_img_tag(str_img_tag, img_path):
|
||||
# 如果img标签指向的是本地磁盘路径 则忽略该标签返回空
|
||||
if "file:///" in str_img_tag:
|
||||
logging.warning(f"存在非法的链接地址:{str_img_tag}")
|
||||
return ""
|
||||
if img_path is None or img_path == "":
|
||||
return ""
|
||||
|
||||
img_url = get_img_tag_url(str_img_tag)
|
||||
if "data:image/png;base64" in str_img_tag:
|
||||
return picture_base64(str_img_tag, img_path)
|
||||
# (4696等存在指向外部链接的 img标签。 暂时保留不删除)
|
||||
elif "http://" in str_img_tag or "https://" in str_img_tag:
|
||||
return download_picture_from_other_url(str_img_tag, img_path)
|
||||
elif not img_url.startswith("http"):
|
||||
return download_picture(str_img_tag, img_path)
|
||||
else:
|
||||
logging.warning(f"未处理的图片标签:{str_img_tag}")
|
||||
return str_img_tag
|
||||
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"url":"http://10.1.0.145:8090/graphql",
|
||||
"Authorization":"Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGkiOjEsImdycCI6MSwiaWF0IjoxNzIzNjMxMjcwLCJleHAiOjE4MTgzMDQwNzAsImF1ZCI6InVybjp3aWtpLmpzIiwiaXNzIjoidXJuOndpa2kuanMifQ.g5H1xVMtk7Q3uvrRdtD3aTm49dQkS11cYdDKIwXo7DthOOTGj9DmFO7yILNDU7XFACTZc1Ej6ryguYV_8vGqoc-Rc7LciwvqS_RHDYUKZNKENbv8df9UGDMB-F9DT_airGc1lGJXgVqypxejDL3fY8aRMGXm7GBIlZKY4JTeI2uJZxffgfqKGrOvc3EOtsGgJzKZo4OyQ8UInGtCTiuq6-mLj_Syix_1z52K1tgfnF4E4-rZH_zCD05hUlUMYUV-KWhPkeOEGR5xbRTrulfCvzDD4T0CX4pI-keSKmgVn1HYSSN4o1Tj_l9zsyhUoLRzhzPK29Q3uekIc9obrvCHrg"
|
||||
}
|
||||
Reference in New Issue
Block a user