Files
zjdataai-app/backend/app/engine/loaders/markdownReader.py
T

90 lines
3.1 KiB
Python

from llama_index.readers.file.markdown import MarkdownReader
from typing import Any, Dict, List, Optional, Tuple
import re
from llama_index.core.utils import get_tokenizer
class ChunkMarkdownReader(MarkdownReader):
def __init__(
self,
*args: Any,
chunkSize:int = 2048,
**kwargs: Any,
) -> None:
self._chunkSize = chunkSize
self._tokenizer = get_tokenizer()
self._colheader = ''
self._rows = []
super().__init__(*args,**kwargs)
def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
markdown_tups: List[Tuple[Optional[str], str]] = []
lines = markdown_text.split("\n")
strTitle = ''
tokensNum:int = 0
current_lines = []
strheader:str = ''
headerSize:int = 0
for line in lines:
tokensNum += self._token_size(line)
if tokensNum > self._chunkSize and len(current_lines) > 0:
if len(markdown_tups) == 0:
markdown_tups.append((strTitle + strheader , "\n".join(current_lines)))
else:
markdown_tups.append((strheader , "\n".join(current_lines)))
tokensNum = headerSize
current_lines.clear()
current_lines.append(line)
if strTitle!='' and strheader!='':
self._rows.append(line)
if line == '\n' or line == '\r':
if tokensNum > self._chunkSize:
raise ValueError('标题Token数大于chunkSize大小')
strTitle = "\n".join(current_lines)
#headerSize = headerSize + self._token_size(strTitle)
current_lines.clear()
if line.startswith("|---"):
self._colheader = current_lines[0]
strheader = "\n".join(current_lines)
headerSize= headerSize + self._token_size(strheader)
current_lines.clear()
if len(current_lines) > 0:
if len(markdown_tups) == 0:
markdown_tups.append((strTitle + strheader , "\n".join(current_lines)))
else:
markdown_tups.append((strheader , "\n".join(current_lines)))
return [
(
key if key is None else re.sub(r"#", "", key).strip(),
re.sub(r"<.*?>", "", value),
)
for key, value in markdown_tups
]
def _token_size(self, text: str) -> int:
return len(self._tokenizer(text))
def findValue(self,expression:str,Field:str):
cols = self._colheader.split('|')
cols = [item for item in cols if item]
for row in self._rows:
rowtrs = row.split('|')
rowdatas = [item for item in rowtrs if item and (item!='\r' or item!='\n')]
if len(rowdatas) == 0:
continue
gData = {}
for cName,rValue in zip(cols,rowdatas):
gData[cName] = rValue
if eval(expression,gData):
return gData[Field]
return ''