116 lines
4.2 KiB
Python
116 lines
4.2 KiB
Python
from llama_index.readers.file.markdown import MarkdownReader
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
import re
|
|
from llama_index.core.utils import get_tokenizer
|
|
|
|
|
|
class ChunkMarkdownReader(MarkdownReader):
|
|
def __init__(
|
|
self,
|
|
*args: Any,
|
|
chunkSize:int = 2048,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
self._chunkSize = chunkSize
|
|
self._tokenizer = get_tokenizer()
|
|
self._colheader = ''
|
|
self._rows = []
|
|
super().__init__(*args,**kwargs)
|
|
|
|
def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
|
|
markdown_tups: List[Tuple[Optional[str], str]] = []
|
|
lines = self._multi_char_split(markdown_text,'\r\n')
|
|
lines = [line for line in lines if line!='']
|
|
|
|
strTitle = ''
|
|
tokensNum:int = 0
|
|
current_lines = []
|
|
strheader:str = ''
|
|
headerSize:int = 0
|
|
bAreadyJudgeTitle = False
|
|
for line in lines:
|
|
tokensNum += self._token_size(line)
|
|
if tokensNum > self._chunkSize and len(current_lines) > 0:
|
|
if len(markdown_tups) == 0:
|
|
titleHead = strTitle + '\n' + strheader if strTitle!= '' else strheader
|
|
markdown_tups.append((titleHead, "\n".join(current_lines)))
|
|
else:
|
|
markdown_tups.append((strheader , "\n".join(current_lines)))
|
|
tokensNum = headerSize
|
|
current_lines.clear()
|
|
|
|
if strheader!='':
|
|
self._rows.append(line)
|
|
|
|
if line.startswith('|') and strTitle == '' and not bAreadyJudgeTitle:
|
|
if len(current_lines) > 0:
|
|
if tokensNum > self._chunkSize:
|
|
raise ValueError('标题Token数大于chunkSize大小')
|
|
strTitle = "\n".join(current_lines)
|
|
current_lines.clear()
|
|
bAreadyJudgeTitle = True
|
|
|
|
current_lines.append(line)
|
|
|
|
if line.startswith("|---"):
|
|
self._colheader = current_lines[0]
|
|
strheader = "\n".join(current_lines)
|
|
headerSize= headerSize + self._token_size(strheader)
|
|
current_lines.clear()
|
|
|
|
|
|
if len(current_lines) > 0:
|
|
if len(markdown_tups) == 0:
|
|
titleHead = strTitle + '\n' + strheader if strTitle!= '' else strheader
|
|
markdown_tups.append((titleHead, "\n".join(current_lines)))
|
|
else:
|
|
markdown_tups.append((strheader , "\n".join(current_lines)))
|
|
|
|
return [
|
|
(
|
|
key if key is None else re.sub(r"#", "", key).strip(),
|
|
re.sub(r"<.*?>", "", value),
|
|
)
|
|
for key, value in markdown_tups
|
|
]
|
|
|
|
def _token_size(self, text: str) -> int:
|
|
return len(self._tokenizer(text))
|
|
|
|
def findValue(self,expression:str,Field:str):
|
|
cols = self._colheader.split('|')
|
|
cols = [item for item in cols if item]
|
|
|
|
for row in self._rows:
|
|
rowtrs = row.split('|')
|
|
rowdatas = [item for item in rowtrs if item and (item!='\r' or item!='\n')]
|
|
if len(rowdatas) == 0:
|
|
continue
|
|
gData = {}
|
|
for cName,rValue in zip(cols,rowdatas):
|
|
gData[cName] = rValue
|
|
if eval(expression,gData):
|
|
return gData[Field]
|
|
return ''
|
|
|
|
def records(self):
|
|
cols = self._colheader.split('|')
|
|
cols = cols[1:-1]
|
|
records = []
|
|
for row in self._rows:
|
|
rowtrs = row.split('|')
|
|
rowdatas = [item for item in rowtrs if (item!='\r' or item!='\n')]
|
|
rowdatas = rowdatas[1:-1]
|
|
if len(rowdatas) == 0:
|
|
continue
|
|
record = {}
|
|
for cName,rValue in zip(cols,rowdatas):
|
|
record[cName] = rValue
|
|
records.append(record)
|
|
return records
|
|
|
|
def _multi_char_split(self,string, separators):
|
|
# 将多个分隔符连成一个正则表达式
|
|
pattern = '[' + re.escape(separators) + ']'
|
|
# 使用正则表达式进行分割
|
|
return re.split(pattern, string) |