diff --git a/rag2_0/intent_recognition/IntentRecognition.py b/rag2_0/intent_recognition/IntentRecognition.py index c79099c..9366a99 100755 --- a/rag2_0/intent_recognition/IntentRecognition.py +++ b/rag2_0/intent_recognition/IntentRecognition.py @@ -32,7 +32,7 @@ from .DataModels import ( StepBackPrompt, HypotheticalDocument ) from .ProfessionalNounVector import ProfessionalNounRetriever, AsyncProfessionalNounRetriever -from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM, SiliconFlowReRankerModel +from rag2_0.tool.ModelTool import XinferenceReRankerModel, OpenAiLLM class AsyncIntentRecognizer: SOFT_WIKI_PATH = "data/wiki_data" diff --git a/rag2_0/tool/ModelTool.py b/rag2_0/tool/ModelTool.py index 0d0c665..2eefb86 100755 --- a/rag2_0/tool/ModelTool.py +++ b/rag2_0/tool/ModelTool.py @@ -19,7 +19,7 @@ import requests import os import logging from rag2_0.tool.APIKeyManager import APIKeyManager - +from urllib.parse import urljoin class SiliconFlowEmbeddings(Embeddings): """SiliconFlow嵌入模型封装""" diff --git a/rag2_0/tool/html_to_md/__init__.py b/rag2_0/tool/html_to_md/__init__.py deleted file mode 100755 index b4513a9..0000000 --- a/rag2_0/tool/html_to_md/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from . import custom_markdownify - -convert_html_to_md = custom_markdownify.md diff --git a/rag2_0/tool/html_to_md/custom_markdownify.py b/rag2_0/tool/html_to_md/custom_markdownify.py deleted file mode 100755 index abca9b3..0000000 --- a/rag2_0/tool/html_to_md/custom_markdownify.py +++ /dev/null @@ -1,491 +0,0 @@ -import re -from textwrap import fill - -import requests -from bs4 import NavigableString -from bs4 import BeautifulSoup -from markdownify import MarkdownConverter, chomp, UNDERLINED, ATX_CLOSED -import copy -from . import picture_process - - -#
是否是单元格内部的换行符 -def judge_br_in_table(el): - if el.name in ['td', 'tr']: - return True - if el.parent is None: - return False - # 递归父级元素 - return judge_br_in_table(el.parent) - - -# 获取div标签中是否为标题,如果是标题则markdown中的返回标题等级 -def get_markdown_title_level(el): - if el.name != 'div' or 'class' not in el.attrs: - return '' - title_level = '' - if 'hdwiki_tmml' in el.attrs['class']: - title_level = '## ' - elif 'hdwiki_tmmll' in el.attrs['class']: - title_level = '### ' - return title_level - - -def str_is_title(text) -> bool: - text = text.strip() - pattern = r'^#+' - - # 使用re.search匹配字符串开头的 # 符号 - match = re.search(pattern, text) - if match: - return True - else: - return False - - -# 判断el 是否是图片的DIV标签 -def is_img_div_tag(el) -> bool: - if el is None: - return False - if el.name != "div": - return False - class_attr = el.get('class') - if class_attr is None: - return False - if "img" in class_attr or "img_l" in class_attr: - return True - else: - return False - - -# 判断div内部是否是纯文本内容,并且display是否为block -def is_only_text_div(el) -> bool: - if el is None or el.name != "div" or el.text == "": - return False - - if el.get("display", "block") != "block": - return False - - # div标签下只包含文本 - if isinstance(el.string, NavigableString): - return True - - # 兼容
1. 版本概述 
判断错误问题 - # 递归获取所有子标签 - child_tags = el.find_all(recursive=True) - for tag in child_tags: - if tag.text == "": - continue - if tag.name in ["table", "td", "img"]: - return False - if isinstance(tag.string, NavigableString): - continue - else: - return False - - return True - - -# a标签是否在图片的div标签内部 -def a_tag_is_in_img(el) -> bool: - if el.parent is None: - return False - if el.name != "a" or el.parent.name != "div": - return False - - return is_img_div_tag(el.parent) - - -class CustomMarkDownConverter(MarkdownConverter): - """ - 创建自定义的换行装换函数 - """ - - def __init__(self, img_download_path, **options): - super().__init__(**options) - self.img_download_path = img_download_path - - # 单元格内的换行依旧保持
格式 - def convert_br(self, el, text, convert_as_inline): - if judge_br_in_table(el): - return "
" - - # 容错处理(文章4696),因bs4解析html错误 导致将 分类图标签 解析到了br标签下导致图片丢失 - if text.strip(): - return text + "\n" - - return super().convert_br(el, text, convert_as_inline) - - # 图片div标签 在图片与图片描述之间添加换行 - @staticmethod - def convert_img_div(text): - pattern = r'\*\*(.*?)\*\*' - match = re.search(pattern, text) - if match: - start_index = match.start() - text = text[:start_index] + "\n" + text[start_index:] - return text - - # 装换标题格式 - def convert_div(self, el, text, convert_as_inline): - title_level = get_markdown_title_level(el) - if title_level != '': - return "\n\n" + title_level + text + '\n\n' - - if is_img_div_tag(el): - # 图片与图片描述文字之间掺入换行符 - return self.convert_img_div(text) - - if is_only_text_div(el): - text = "\n\n" + text + "\n\n" - - return text - - # 检查 URL 是否有效的函数 - @staticmethod - def is_valid_url(url): - try: - response = requests.head(url, allow_redirects=True) - return response.status_code == 200 - except requests.RequestException: - return False - - @staticmethod - def try_complete_img_description(img_el): - if img_el is None or img_el.name != "img": - return - - # 找到父级的div标签 - img_el_parent_div = None - cur_el = img_el - while cur_el.parent is not None: - if is_img_div_tag(cur_el.parent): - img_el_parent_div = cur_el.parent - break - cur_el = cur_el.parent - - if img_el_parent_div is not None and len(img_el_parent_div.text) != 0: - img_el.attrs["alt"] = img_el_parent_div.text - return - - # 找到父级的figure标签 - img_el_parent_div = None - cur_el = img_el - while cur_el.parent is not None: - if cur_el.parent is not None and cur_el.parent.name == 'figure': - img_el_parent_div = cur_el.parent - break - cur_el = cur_el.parent - - if img_el_parent_div is not None and len(img_el_parent_div.text) != 0: - img_el.attrs["alt"] = img_el_parent_div.text - return - - - def convert_figcaption(self, el, text, convert_as_inline): - return "" - - # 图片后添加空行,图片应该单独在一行后面不接文字(示例文章:6925) - def convert_img(self, el, text, convert_as_inline): - self.try_complete_img_description(el) - img_text = super().convert_img(el, text, convert_as_inline) - - # 5195 出现img标签内出现换行导致 markdown图片显示出现问题 - img_text = img_text.replace("\r\n", "") - img_text = img_text.replace("\n", "") - # 空的img标签直接返回空行 - if img_text == "![]()": - return '\n\n' - - # img 标签使用父级超链接标签中的中大图 - src = el.attrs.get('src', None) or '' - if el.parent is not None and el.parent.name == "a": - href = el.parent.attrs.get('href', None) or '' - href_path = href.rsplit(".", 1)[0] - src_path = src.rsplit(".", 1)[0] - if href_path + "_s" == src_path: - img_text = img_text.replace(src, href) - - if '_s' in img_text: - src_path = src.rsplit(".", 1)[0] - if src_path.endswith('_s'): - original_src_path = src_path[:-2] # 去掉末尾的 '_s' - # 构建原始 URL - original_url = original_src_path + "." + src.split(".")[-1] - if self.is_valid_url(original_url): - img_text = img_text.replace(src, original_url) - - # 转换并下载图片 - return picture_process.process_img_tag(img_text, self.img_download_path) - - @staticmethod - def is_img_describe_strong(el) -> bool: - if el is None or el.parent is None: - return False - - if len(el.contents) == 0: - return False - - # if not isinstance(el.contents[0], NavigableString): - # return False - - img_list = el.parent.findAll("img") - if len(img_list) == 0: - return False - - for img_tag in img_list: - alt = img_tag.get("alt", None) - title = img_tag.get("title", None) - if alt is None and title is None: - continue - - if alt == el.text or title == el.text: - return True - - return False - - def convert_b(self, el, text, convert_as_inline): - # 如果b 标签下只存在一个标题,则该b不做任何处理,避免对标题进行加粗(示例文章:6925) - if len(el.contents) == 1: - title_level = get_markdown_title_level(el.contents[0]) - if title_level != '': - return text - - # 标签中存在标题时,不在对内容进行加粗 - if str_is_title(text): - return text - - if self.is_img_describe_strong(el): - return "" - - text = text.strip(" \t") - suffix = "" - if text.endswith("\n"): - suffix = " \n" - b_text = super().convert_b(el, text, convert_as_inline) - - # 解析完 标签后添加空格。避免出现markdown文档中出现《**1.****版本概述**》(文章2377 4292等) - return " " + b_text + suffix + " " - - convert_strong = convert_b - - # 有可能出现

之后紧接一个标题hdwiki_tmml 故前后添加换行 - def convert_p(self, el, text, convert_as_inline): - if convert_as_inline: - return text - if self.options['wrap']: - text = fill(text, - width=self.options['wrap_width'], - break_long_words=False, - break_on_hyphens=False) - #

标签前后换行 - return '\n\n%s\n\n' % text if text else '' - - def convert_a(self, el, text, convert_as_inline): - prefix, suffix, text = chomp(text) - if not text: - return '' - href = el.get('href') - if self.is_href_img(href): - return text - title = el.get('title') - # 5195 出现img标签内出现换行导致 markdown图片显示出现问题 - if title is not None: - title = title.replace("\n", "") - # For the replacement see #29: text nodes underscores are escaped - if (self.options['autolinks'] - and text.replace(r'\_', '_') == href - and not title - and not self.options['default_title']): - # Shortcut syntax - return '<%s>' % href - if self.options['default_title'] and not title: - title = href - title_part = ' "%s"' % title.replace('"', r'\"') if title else '' - - a_tag = '%s[%s](%s%s)%s' % (prefix, text, href, title_part, suffix) if href else text - return a_tag - - @staticmethod - def is_href_img(href_url) -> bool: - if href_url is None: - return False - file_extension = href_url.split(".")[-1] - # 不是图片不处理 - file_extension = file_extension.lower() - if file_extension not in ["jpg", "jpeg", "png", "gif"]: - return False - - return True - - def convert_li(self, el, text, convert_as_inline): - # 为空的li标签返回空(文章 4347) - if not text.strip(): - return "" - - li_text = super().convert_li(el, text, convert_as_inline) - return li_text - - def convert_td(self, el, text, convert_as_inline): - if "\r\n" in text: - text = text.replace("\r\n", "
") - - if "\n" in text: - text = text.replace("\n", "
") - - return ' ' + text + ' |' - - def convert_hn(self, n, el, text, convert_as_inline): - if convert_as_inline: - return text - - style = self.options['heading_style'].lower() - text = text.rstrip() - if style == UNDERLINED and n <= 2: - line = '=' if n == 1 else '-' - return self.underline(text, line) - hashes = '#' * n - hashes = hashes + " " - if style == ATX_CLOSED: - return '\n\n %s %s %s\n\n' % (hashes, text, hashes) - return '\n\n%s %s\n\n' % (hashes, text) - - @staticmethod - def convert_thead_table(el, text, cell_name, convert_as_inline): - cells = el.find_all(['td', 'th']) - is_headrow = all([cell.name == cell_name for cell in cells]) - overline = '' - underline = '' - if is_headrow and not el.previous_sibling: - # first row and is headline: print headline underline - underline += '| ' + ' | '.join(['---'] * len(cells)) + ' |' + '\n' - elif (not el.previous_sibling - and (el.parent.name == 'table' - or (el.parent.name == 'tbody' - and not el.parent.previous_sibling))): - # first row, not headline, and: - # - the parent is table or - # - the parent is tbody at the beginning of a table. - # print empty headline above this row - overline += '| ' + ' | '.join([''] * len(cells)) + ' |' + '\n' - overline += '| ' + ' | '.join(['---'] * len(cells)) + ' |' + '\n' - return overline + '|' + text + '\n' + underline - - def convert_tr(self, el, text, convert_as_inline): - # 解决table标签下存在thead的问题 (文章4061 1976) - if el and el.parent and el.parent.name == "thead": - return CustomMarkDownConverter.convert_thead_table(el, text, 'td', convert_as_inline) - - # 兼容 table->colgroup、tbody->tr 文章4364 - if (el and el.parent and el.parent.previousSibling - and el.parent.name == "tbody" - and el.parent.previousSibling.name == "colgroup"): - return CustomMarkDownConverter.convert_thead_table(el, text, 'td', convert_as_inline) - - return super().convert_tr(el, text, convert_as_inline) - - def convert_pre(self, el, text, convert_as_inline): - # 文章5192出现pre标签,但内容不是代码。故不额外处理pre标签 - return text - - def escape(self, text): - if not text: - return '' - if self.options['escape_misc']: - # text = re.sub(r'([\\&<`[>~#=+|-])', r'\\\1', text) - text = re.sub(r'([\\&<`[>~#%=+|-])', r'\\\1', text) - # 以下的转义是不必要的 - # text = re.sub(r'([0-9])([.)])', r'\1\\\2', text) - if self.options['escape_asterisks']: - text = text.replace('*', r'\*') - if self.options['escape_underscores']: - text = text.replace('_', r'\_') - return text - - @staticmethod - def convert_span(el, text, convert_as_inline): - # 文章3526出现图片后面紧接图片文本的问题。图片文本在span标签内 - if "style" not in el.attrs: - return text - - style_attr = el.attrs['style'] - - if style_attr is None: - return text - style_content = style_attr.split(';') - # 遍历style属性内容,找到display的值 - for item in style_content: - if 'display' in item: - display_value = item.split(': ')[1] # 获取冒号后的值 - if display_value == "block" and text != "": - return f"\n\n{text}\n\n" - return text - - -def expand_html_table(html) -> tuple[str, bool]: - soup = BeautifulSoup(html, 'html.parser') - tables = soup.find_all('table') - if len(tables) == 0: - return html, False - for table in tables: - # 创建一个二维列表来表示表格 - table_rows = table.find_all('tr') - max_cols = 0 - for row in table_rows: - cols = row.find_all(['td', 'th']) - col_count = sum([int(col.get('colspan', 1)) for col in cols]) - if col_count > max_cols: - max_cols = col_count - - # 初始化一个二维列表来存储最终的表格 - result_table = [] - for _ in range(len(table_rows)): - result_table.append([None] * max_cols) - - # 填充二维列表 - for r, row in enumerate(table_rows): - cols = row.find_all(['td', 'th']) - c = 0 - for col in cols: - while result_table[r][c] is not None: - c += 1 - colspan = int(col.get('colspan', 1)) - rowspan = int(col.get('rowspan', 1)) - for i in range(rowspan): - for j in range(colspan): - # 拆分合并单元格时,重复内容 - result_table[r + i][c + j] = copy.copy(col) - # if j == 0 and i == 0: - # result_table[r + i][c + j] = copy.copy(col) - # else: - # result_table[r + i][c + j] = soup.new_tag('td') - c += colspan - - # 生成新的表格 HTML - new_table = soup.new_tag('table', border="1", cellspacing="0") - tbody = soup.new_tag('tbody') - new_table.append(tbody) - for row in result_table: - tr = soup.new_tag('tr') - for col in row: - if col is not None: - td = soup.new_tag(col.name) - td.string = col.get_text() - tr.append(td) - tbody.append(tr) - - # 替换原始HTML中的旧表格 - table.replace_with(new_table) - - return str(soup), True - - -# Create shorthand method for conversion -def md(html, img_download_path, **options): - new_html, result = expand_html_table(html) - markdown_content = CustomMarkDownConverter(img_download_path, **options).convert(new_html) - # 删除换行符中间的空格 - temp_txt = re.sub(r'\n\s*\n', '\n\n', markdown_content) - # 连续超过3个以上的换行符替换为3个 - temp_txt = re.sub(r'\n{3,}', '\n\n\n', temp_txt) - return temp_txt diff --git a/rag2_0/tool/html_to_md/picture_process.py b/rag2_0/tool/html_to_md/picture_process.py deleted file mode 100755 index 9228de3..0000000 --- a/rag2_0/tool/html_to_md/picture_process.py +++ /dev/null @@ -1,170 +0,0 @@ -import base64 -import hashlib -import logging -import os -import re -import uuid -from urllib.parse import urljoin -import requests - - -def get_img_tag_url(img_tag): - - # 提取图片url的正则表达式模式 - pattern = r'\!\[.*?\]\((.*?)\)' - # 找到第一个匹配的链接 - match = re.search(pattern, img_tag) - if not match: - return "" - - # 获取匹配到的链接 - link = match.group(1) - # 第0个为链接 - link = link.split(" ")[0] - return link - - -# 填充img标签中的图片链接 -# img_tag '![1](http://wiki.jxbw.com/hdwiki/uploads/202303/1679471232U4iPCjtm_s.jpg "1")' -# img_tag '![1](uploads/202303/1679471232U4iPCj6tm_s.jpg "1")' -def fill_img_url(img_tag): - """ - 填充img标签中的图片链接。 - - 参数: - img_tag (str): 原始的img标签 - - 返回: - tuple: 修改后的img标签和图片的完整链接 - """ - # 一个完整的img标签内删除换行符 - img_tag = img_tag.replace("\n", "") - link = get_img_tag_url(img_tag) - if len(link) == 0: - return img_tag, '' - - base_url = os.getenv("IMG_URL_PREFIX") - if "http:" in link: - # 图片为全链接,不替换 - return img_tag, link - elif base_url: - # 补全图片链接 - full_link = urljoin(base_url, link) - img_tag = img_tag.replace(link, full_link) - return img_tag, full_link - else: - return img_tag, '' - - -def download_picture(img_tag, download_path): - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/94.0.4606.71 Safari/537.36 ' - } - img_tag, img_url = fill_img_url(img_tag) - if img_url == '': - return img_tag - # if "_s" in img_tag: - # breakpoint() - file_name = img_url.split("/")[-1] - file_path = os.path.normpath(download_path + "\\" + file_name) - file_path = file_path.replace("\\", "/") - - # 文件已经存在时不下载 - if not os.path.exists(file_path): - img_date = requests.get(url=img_url, headers=headers).content - logging.info(f"图片下载成功:{img_url}") - with open(file_path, 'wb') as fp: - fp.write(img_date) - - # img_tag中的url替换为下载的图片路径 - return img_tag.replace(img_url, file_path) - - -def download_picture_from_other_url(img_tag, download_path): - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/94.0.4606.71 Safari/537.36 ' - } - img_tag, img_url = fill_img_url(img_tag) - # if "_s" in img_tag: - # breakpoint() - file_name = uuid.uuid4() - file_path = os.path.join(download_path, f"{file_name}.png") - file_path = os.path.normpath(file_path) - # 文件已经存在时不下载 - if not os.path.exists(file_path): - try: - img_date = requests.get(url=img_url, headers=headers).content - with open(file_path, 'wb') as fp: - fp.write(img_date) - logging.info(f"图片下载成功:{img_url}") - except Exception as e: - logging.warning(f"img download error url:{img_url}") - return img_tag - - # img_tag中的url替换为下载的图片路径 - return img_tag.replace(img_url, file_path) - - -def extract_base64_from_data_uri(data_uri): - # 分割字符串以找到 base64 部分 - parts = data_uri.split(',') - if len(parts) == 2 and parts[0].endswith('base64'): - # 移除后缀并返回 base64 值 - return parts[1][:-1] - else: - return None - - -def picture_base64(img_tag, picture_save_path): - # 解码Base64字符串 - # ![](data:image/png;base64,1679471232U4iPCj6tm_s) - base64_str = extract_base64_from_data_uri(img_tag) - if picture_save_path is None or picture_save_path == "": - return "![picture](空)" - # 将图片内容做MD5 用作文件名 - hash_object = hashlib.md5() - hash_object.update(base64_str.encode()) - img_md5 = hash_object.hexdigest() - - picture_save_path = picture_save_path + "\\%s.png" % img_md5 - picture_save_path = os.path.normpath(picture_save_path) - picture_save_path = picture_save_path.replace("\\", "/") - - # 文件已经存在时不重新保存 - if not os.path.exists(picture_save_path): - decoded_string = base64.b64decode(base64_str) - with open(picture_save_path, 'wb') as fp: - fp.write(decoded_string) - - # 修改img_tab的图片路径 - match = re.search("\[(.*?)\]", img_tag) - result = "" - if match: - result = match.group(1) - if result == "": - return "![picture](%s)" % picture_save_path - else: - return "![%s](%s \"%s\")" % (result, picture_save_path, result) - - -def process_img_tag(str_img_tag, img_path): - # 如果img标签指向的是本地磁盘路径 则忽略该标签返回空 - if "file:///" in str_img_tag: - logging.warning(f"存在非法的链接地址:{str_img_tag}") - return "" - if img_path is None or img_path == "": - return "![picture](空)" - - img_url = get_img_tag_url(str_img_tag) - if "data:image/png;base64" in str_img_tag: - return picture_base64(str_img_tag, img_path) - # (4696等存在指向外部链接的 img标签。 暂时保留不删除) - elif "http://" in str_img_tag or "https://" in str_img_tag: - return download_picture_from_other_url(str_img_tag, img_path) - elif not img_url.startswith("http"): - return download_picture(str_img_tag, img_path) - else: - logging.warning(f"未处理的图片标签:{str_img_tag}") - return str_img_tag