上传文件至 /

4.3更新
This commit is contained in:
2025-04-03 17:24:18 +08:00
parent c152fb8714
commit 9b9453a1a3
2 changed files with 196 additions and 146 deletions
+156 -143
View File
@@ -1,143 +1,156 @@
""" """
=================================== ===================================
@AutherWenZ @AutherWenZ
@Company: BooWay @Company: BooWay
@projectbooway_dm @projectbooway_dm
=================================== ===================================
""" """
# import spacy # import spacy
# import zh_core_web_sm, zh_core_web_md, zh_core_web_lg, zh_core_web_trf # import zh_core_web_sm, zh_core_web_md, zh_core_web_lg, zh_core_web_trf
# #
# # nlp_sm = zh_core_web_sm.load() # # nlp_sm = zh_core_web_sm.load()
# # nlp_md = zh_core_web_md.load() # # nlp_md = zh_core_web_md.load()
# # nlp_lg = zh_core_web_lg.load() # # nlp_lg = zh_core_web_lg.load()
# nlp_trf = zh_core_web_trf.load() # nlp_trf = zh_core_web_trf.load()
# #
# polite_words = {"你好", "您好", "请", "请问", "谢谢", "不客气", "麻烦", "打扰", "拜托", "辛苦", "劳驾"} # polite_words = {"你好", "您好", "请", "请问", "谢谢", "不客气", "麻烦", "打扰", "拜托", "辛苦", "劳驾"}
# 停用词清理 # 停用词清理
def stop_word_processing(input_str, nlp_stytle, polite_words): def stop_word_processing(input_str, nlp_stytle, polite_words):
doc = nlp_stytle(input_str) doc = nlp_stytle(input_str)
# 去除停用词 # 去除停用词
filtered_tokens = [ filtered_tokens = [
token.text for token in doc token.text for token in doc
if not token.is_stop and not token.is_punct and not token.is_space and token.text not in polite_words] if not token.is_stop and not token.is_punct and not token.is_space and token.text not in polite_words]
return ''.join(filtered_tokens) return ''.join(filtered_tokens)
# 后缀名检测 # 后缀名检测
def extract_names_from_json(file_path): def extract_names_from_json(file_path):
import json import json
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
# 确保数据是一个列表 # 确保数据是一个列表
if isinstance(data, list): if isinstance(data, list):
names = [item.get("name") for item in data if "name" in item] names = [item.get("name") for item in data if "name" in item]
return names return names
else: else:
raise ValueError("JSON 文件的格式应为包含对象的列表") raise ValueError("JSON 文件的格式应为包含对象的列表")
def judge_define_suffix(input_str): def judge_define_suffix(input_str):
suffix_file_path = "../data/booway_knowledge_base/keywords_kg/suffix_keywords.json" suffix_file_path = "../data/booway_knowledge_base/keywords_kg/suffix_keywords.json"
suffix_fields = extract_names_from_json(suffix_file_path) suffix_fields = extract_names_from_json(suffix_file_path)
suffix_fields.extend(['gec5', 'bczc2', 'xzwb2', 'BPQ', 'BPY']) suffix_fields.extend(['gec5', 'bczc2', 'xzwb2', 'BPQ', 'BPY'])
import re import re
# 构建正则表达式模式,匹配大小写不敏感且前面可能带有. # 构建正则表达式模式,匹配大小写不敏感且前面可能带有.
# 去掉 \b 以允许字段是其他字符串的一部分 # 去掉 \b 以允许字段是其他字符串的一部分
pattern = r'(?:\.?)(' + '|'.join(re.escape(field) for field in suffix_fields) + r')' pattern = r'(?:\.?)(' + '|'.join(re.escape(field) for field in suffix_fields) + r')'
# 使用 re.IGNORECASE 标志来忽略大小写 # 使用 re.IGNORECASE 标志来忽略大小写
if re.search(pattern, input_str, re.IGNORECASE): if re.search(pattern, input_str, re.IGNORECASE):
return True return True
else: else:
return False return False
def match_suffix(input_str): def match_suffix(input_str):
import re import re
# 修改正则表达式,匹配连续字母组合或字母+数字组合 # 修改正则表达式,匹配连续字母组合或字母+数字组合
segments = re.findall(r'[A-Za-z]+(?:[0-9]+)?', input_str) segments = re.findall(r'[A-Za-z]+(?:[0-9]+)?', input_str)
# 过滤条件:必须是包含字母的组合,且可以包含字母或字母+数字 # 过滤条件:必须是包含字母的组合,且可以包含字母或字母+数字
matches = [seg for seg in segments if any(c.isalpha() for c in seg)] matches = [seg for seg in segments if any(c.isalpha() for c in seg)]
return matches[0] if matches else '未知' return matches[0] if matches else '未知'
def retrieve_relevant_software(suffix_name): def retrieve_relevant_software(suffix_name):
import json import json
suffix_file_path = "../data/booway_knowledge_base/keywords_kg/suffix_keywords.json" suffix_file_path = "../data/booway_knowledge_base/keywords_kg/suffix_keywords.json"
with open(suffix_file_path, 'r', encoding='utf-8') as file: with open(suffix_file_path, 'r', encoding='utf-8') as file:
data = json.load(file) data = json.load(file)
suffix_name_lower = suffix_name.lower() suffix_name_lower = suffix_name.lower()
for item in data: for item in data:
item_name = item.get('name', '').lower() item_name = item.get('name', '').lower()
if item_name == suffix_name_lower: if item_name == suffix_name_lower:
return item.get('description', {}).get('software_name', 0) return item.get('description', {}).get('software_name', 0)
return 0 return 0
def get_keywords(input_str): def get_keywords(input_str):
import re import re
matches = re.findall(r'【(.*?)】', input_str) matches = re.findall(r'【(.*?)】', input_str)
# 获取第二个【】里的内容(索引为1 # 获取第二个【】里的内容(索引为1
if len(matches) >= 2: if len(matches) >= 2:
second = matches[1] second = matches[1]
return second return second
else: else:
return None return None
def get_keywords_v2(input_str): def get_keywords_v2(input_str):
import re import re
match = re.search(r'】(.*)', input_str) match = re.search(r'】(.*)', input_str)
if match: if match:
output = match.group(1) output = match.group(1)
return output return output
else: else:
return None return None
def get_keywords_v3(input_str): def get_keywords_v3(input_str):
import re import re
match = re.findall(r'【(.*?)】', input_str) match = re.findall(r'【(.*?)】', input_str)
if match: if match:
output = match[1:] output = match[1:]
return output return output
else: else:
return None return None
def get_keywords_v4(input_str):
def normalize_text(text: str, synonym_dict: dict) -> str: import re
import re matches = re.findall(r'【(.*?)】', input_str)
# 构建同义词到主词的映射表(扁平化)
flat_synonyms = {} # 获取第一个和第二个【】里的内容(索引为0和1)
for main_word, synonyms in synonym_dict.items(): first = matches[0] if len(matches) >= 1 else None
for syn in synonyms: second = matches[1] if len(matches) >= 2 else None
flat_synonyms[syn] = main_word
return first, second
# 按长度从大到小排序,避免短词覆盖长词(如“下载”和“下载下来”)
sorted_synonyms = sorted(flat_synonyms.keys(), key=len, reverse=True)
# 逐个替换 def normalize_text(text: str, synonym_dict: dict) -> str:
for syn in sorted_synonyms: import re
# 用正则确保是整词匹配,但保留灵活性(可处理“...费费率...”这种词连着的情况 # 构建同义词到主词的映射表(扁平化
text = re.sub(re.escape(syn), flat_synonyms[syn], text) flat_synonyms = {}
for main_word, synonyms in synonym_dict.items():
return text for syn in synonyms:
flat_synonyms[syn] = main_word
# 按长度从大到小排序,避免短词覆盖长词(如“下载”和“下载下来”)
sorted_synonyms = sorted(flat_synonyms.keys(), key=len, reverse=True)
# 逐个替换
for syn in sorted_synonyms:
# 用正则确保是整词匹配,但保留灵活性(可处理“...费费率...”这种词连着的情况)
text = re.sub(re.escape(syn), flat_synonyms[syn], text)
return text
+40 -3
View File
@@ -1,9 +1,44 @@
import os import os
from langchain_community.vectorstores import FAISS from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings # from langchain_huggingface import HuggingFaceEmbeddings
embedding_path = "/data/Z_LLM_data/Embed_data/bge-m3" # embedding_path = "/data/Z/Z_llm_dm/vector_data/bge-m3"
embeddings = HuggingFaceEmbeddings(model_name=embedding_path) # embeddings = HuggingFaceEmbeddings(model_name=embedding_path)
from typing import List
import requests
from langchain.embeddings.base import Embeddings
class SiliconFlowEmbeddings(Embeddings):
def __init__(self, api_key: str, model: str = "bge-m3"):
self.api_key = api_key
self.model = model
self.url = "http://10.1.16.39:9995/v1/embeddings"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def _embed(self, input: List[str]) -> List[List[float]]:
payload = {
"model": self.model,
"input": input,
"encoding_format": "float"
}
response = requests.post(self.url, json=payload, headers=self.headers)
response.raise_for_status()
data = response.json()
return [item["embedding"] for item in data["data"]]
def embed_documents(self, texts: List[str]) -> List[List[float]]:
return self._embed(texts)
def embed_query(self, text: str) -> List[float]:
return self._embed([text])[0]
embeddings = SiliconFlowEmbeddings(api_key="sk-ftnofbucchwnscojohyxwmfzgaykdxihafnlphohsinftkbr")
def Mixed_retrieval(input_path): def Mixed_retrieval(input_path):
file_name = os.path.splitext(os.path.basename(input_path))[0] file_name = os.path.splitext(os.path.basename(input_path))[0]
@@ -33,6 +68,8 @@ def Mixed_retrieval(input_path):
return retriever_txt_faiss1, retriever_txt_faiss2, retriever_txt_faiss3 return retriever_txt_faiss1, retriever_txt_faiss2, retriever_txt_faiss3
def interface_search(input_str, retriever_txt_faiss1, retriever_txt_faiss2, retriever_txt_faiss3): def interface_search(input_str, retriever_txt_faiss1, retriever_txt_faiss2, retriever_txt_faiss3):
index_keyword1 = [] index_keyword1 = []
for i in retriever_txt_faiss1.invoke(input_str): for i in retriever_txt_faiss1.invoke(input_str):