import base64 import hashlib import logging import os import re import uuid from urllib.parse import urljoin import requests def get_img_tag_url(img_tag): # 提取图片url的正则表达式模式 pattern = r'\!\[.*?\]\((.*?)\)' # 找到第一个匹配的链接 match = re.search(pattern, img_tag) if not match: return "" # 获取匹配到的链接 link = match.group(1) # 第0个为链接 link = link.split(" ")[0] return link # 填充img标签中的图片链接 # img_tag '![1](http://wiki.jxbw.com/hdwiki/uploads/202303/1679471232U4iPCjtm_s.jpg "1")' # img_tag '![1](uploads/202303/1679471232U4iPCj6tm_s.jpg "1")' def fill_img_url(img_tag): """ 填充img标签中的图片链接。 参数: img_tag (str): 原始的img标签 返回: tuple: 修改后的img标签和图片的完整链接 """ # 一个完整的img标签内删除换行符 img_tag = img_tag.replace("\n", "") link = get_img_tag_url(img_tag) if len(link) == 0: return img_tag, '' base_url = os.getenv("IMG_URL_PREFIX") if "http:" in link: # 图片为全链接,不替换 return img_tag, link elif base_url: # 补全图片链接 full_link = urljoin(base_url, link) img_tag = img_tag.replace(link, full_link) return img_tag, full_link else: return img_tag, '' def download_picture(img_tag, download_path): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/94.0.4606.71 Safari/537.36 ' } img_tag, img_url = fill_img_url(img_tag) if img_url == '': return img_tag # if "_s" in img_tag: # breakpoint() file_name = img_url.split("/")[-1] file_path = os.path.normpath(download_path + "\\" + file_name) file_path = file_path.replace("\\", "/") # 文件已经存在时不下载 if not os.path.exists(file_path): img_date = requests.get(url=img_url, headers=headers).content logging.info(f"图片下载成功:{img_url}") with open(file_path, 'wb') as fp: fp.write(img_date) # img_tag中的url替换为下载的图片路径 return img_tag.replace(img_url, file_path) def download_picture_from_other_url(img_tag, download_path): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/94.0.4606.71 Safari/537.36 ' } img_tag, img_url = fill_img_url(img_tag) # if "_s" in img_tag: # breakpoint() file_name = uuid.uuid4() file_path = os.path.join(download_path, f"{file_name}.png") file_path = os.path.normpath(file_path) # 文件已经存在时不下载 if not os.path.exists(file_path): try: img_date = requests.get(url=img_url, headers=headers).content with open(file_path, 'wb') as fp: fp.write(img_date) logging.info(f"图片下载成功:{img_url}") except Exception as e: logging.warning(f"img download error url:{img_url}") return img_tag # img_tag中的url替换为下载的图片路径 return img_tag.replace(img_url, file_path) def extract_base64_from_data_uri(data_uri): # 分割字符串以找到 base64 部分 parts = data_uri.split(',') if len(parts) == 2 and parts[0].endswith('base64'): # 移除后缀并返回 base64 值 return parts[1][:-1] else: return None def picture_base64(img_tag, picture_save_path): # 解码Base64字符串 # ![](data:image/png;base64,1679471232U4iPCj6tm_s) base64_str = extract_base64_from_data_uri(img_tag) if picture_save_path is None or picture_save_path == "": return "![picture](空)" # 将图片内容做MD5 用作文件名 hash_object = hashlib.md5() hash_object.update(base64_str.encode()) img_md5 = hash_object.hexdigest() picture_save_path = picture_save_path + "\\%s.png" % img_md5 picture_save_path = os.path.normpath(picture_save_path) picture_save_path = picture_save_path.replace("\\", "/") # 文件已经存在时不重新保存 if not os.path.exists(picture_save_path): decoded_string = base64.b64decode(base64_str) with open(picture_save_path, 'wb') as fp: fp.write(decoded_string) # 修改img_tab的图片路径 match = re.search("\[(.*?)\]", img_tag) result = "" if match: result = match.group(1) if result == "": return "![picture](%s)" % picture_save_path else: return "![%s](%s \"%s\")" % (result, picture_save_path, result) def process_img_tag(str_img_tag, img_path): # 如果img标签指向的是本地磁盘路径 则忽略该标签返回空 if "file:///" in str_img_tag: logging.warning(f"存在非法的链接地址:{str_img_tag}") return "" if img_path is None or img_path == "": return "![picture](空)" img_url = get_img_tag_url(str_img_tag) if "data:image/png;base64" in str_img_tag: return picture_base64(str_img_tag, img_path) # (4696等存在指向外部链接的 img标签。 暂时保留不删除) elif "http://" in str_img_tag or "https://" in str_img_tag: return download_picture_from_other_url(str_img_tag, img_path) elif not img_url.startswith("http"): return download_picture(str_img_tag, img_path) else: logging.warning(f"未处理的图片标签:{str_img_tag}") return str_img_tag