Files
langchain_KG/kg_lab_6.13/xml_to_json.py
T
zoujiwen 850f0476c1 上传文件至 kg_lab_6.13
7.3 优化条件分支
2025-07-03 09:59:59 +08:00

563 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import chardet
import xml.etree.ElementTree as ET
import json
def read_xml_as_string(file_path):
# 先读取部分字节探测编码
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
# 使用探测到的编码重新读取为字符串
return raw_data.decode(encoding)
def parse_keyword(keyword, indicator_name):
# 特殊处理:电压等级
# if indicator_name == "电压等级":
# return {"映射规则": "1", "指标映射": [keyword]}
# 处理范围表达式(包含"||"分隔符)
if "||" in keyword:
parts = keyword.split("||")
table_rows = []
all_codes = []
for part in parts:
if "@@" not in part:
continue
codes_str, value = part.split("@@", 1)
code_ranges = codes_str.split("、")
for code_range in code_ranges:
# 处理连续编号(如YX5-67~69
if "~" in code_range:
prefix, range_part = code_range.rsplit("-", 1)
start_str, end_str = range_part.split("~")
try:
start = int(start_str)
end = int(end_str)
for num in range(start, end + 1):
all_codes.append(f"{prefix}-{num}")
except ValueError:
all_codes.append(code_range)
else:
all_codes.append(code_range)
table_rows.append(f"| {codes_str} | {value} |")
rule_table = "| 资源识别规则 | 指标值 |\n|-------|-------|\n" + "\n".join(table_rows)
return {"映射规则": rule_table, "指标映射": all_codes}
# 处理数学公式(包含"/"和括号)
if "/" in keyword and "(" in keyword and ")" in keyword:
# 提取分子(括号前部分)
molecule = keyword.split("/")[0].strip()
# 提取分母(括号内部分)
denominator_start = keyword.find("(") + 1
denominator_end = keyword.find(")")
denominator_expr = keyword[denominator_start:denominator_end]
# 分割分母中的加法项
denominator_items = [item.strip() for item in denominator_expr.split("+")]
return {"映射规则": keyword, "指标映射": [molecule] + denominator_items}
# 处理加法表达式
if "+" in keyword:
items = [item.strip() for item in keyword.split("+")]
return {"映射规则": keyword, "指标映射": items}
# 默认处理(普通关键字)
return {"映射规则": keyword, "指标映射": [keyword]}
def xml_to_json2(xml_content):
root = ET.fromstring(xml_content)
records = root.findall('.//records/record')
result = []
for record in records:
unit = record.get("单位")
unit_type = record.get("单价类型")
order = record.get("序号")
extraction_method = record.get("提取方式")
indicator_type = record.get("指标类型")
index_extraction_scope = record.get("指标提取范围")
data_sources = record.get("数据来源")
indicator_name = record.get("指标名称")
keyword = record.get("关键字")
parsed = parse_keyword(keyword, indicator_name)
if data_sources == "报表指标":
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "主材单价":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "主材参数":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "主材数量":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "定额参数":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "定额数量":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "工程费用":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "指标库":
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
elif data_sources == "项目划分费用":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
})
return json.dumps(result, ensure_ascii=False, indent=4)
def xml_to_json(xml_content):
root = ET.fromstring(xml_content)
records = root.findall('.//records/record')
result = []
# 定义需要特殊处理的数据来源类型
scope_based_sources = ["主材单价", "主材参数", "主材数量", "定额参数", "定额数量", "工程费用"]
direct_sources = ["报表指标", "指标库"]
project_division = ["项目划分费用"]
for record in records:
unit = record.get("单位")
unit_type = record.get("单价类型")
order = record.get("序号")
extraction_method = record.get("提取方式")
indicator_type = record.get("指标类型")
index_extraction_scope = record.get("指标提取范围")
data_sources = record.get("数据来源")
indicator_name = record.get("指标名称")
keyword = record.get("关键字")
parsed = parse_keyword(keyword, indicator_name)
base_item = {
"指标名称": indicator_name,
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method,
"指标类型": indicator_type,
"数据来源": data_sources
}
if data_sources in direct_sources:
base_item["指标描述"] = {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
}
result.append(base_item)
elif data_sources in project_division:
mapping_desc = f"查找一下项目划分节点【{index_extraction_scope}】下费用预览的【{indicator_name}】"
base_item["指标描述"] = {
"指标映射": mapping_desc,
"映射规则": parsed["映射规则"]
}
result.append(base_item)
elif data_sources in scope_based_sources:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
if index_extraction_scope:
# 取数据来源的最后两个字(如"单价"、"参数"等)
temp2 = data_sources[-2:]
mapping_desc = f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}"
else:
mapping_desc = f"从【{data_sources}】中获取{temp0}的属性"
base_item["指标描述"] = {
"指标映射": mapping_desc,
"映射规则": temp1
}
result.append(base_item)
else:
# 处理未定义的数据来源类型
base_item["指标描述"] = {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
}
result.append(base_item)
return json.dumps(result, ensure_ascii=False, indent=4)
def transform_text(text, input_str="【FFFFF】"):
import re
# 匹配方括号内的内容
pattern = re.compile(r"\[(.*?)\]")
# 查找方括号内容
match = pattern.search(text)
if not match:
# 如果没有匹配到,返回原文
return text
# 获取匹配的方括号内容
bracket_content = match.group(1)
# 在原字符串中,用sub插入新内容
# 注意使用re.sub的count=1只替换第一个匹配
result = pattern.sub(f"{input_str}{bracket_content}】", text, count=1)
return result
def replace_last_brackets(s):
import re
# 使用正则查找所有 []
matches = list(re.finditer(r'\[([^\[\]]*)\]', s))
if not matches:
return s # 没有匹配,直接返回
# 取最后一个匹配
last = matches[-1]
# 构造替换后的字符串
start, end = last.span()
content = last.group(1)
return s[:start] + f'【{content}】' + s[end:]
def parse_indicator_string_to_json(indicator_str: str, output_path: str = "output.json"):
import re
try:
# 解析为 JSON 对象
result = json.loads(indicator_str)
# 过滤
for item in result:
mapping = item.get("指标描述", {}).get("指标映射")
if isinstance(mapping, str):
if "【工程费用】" in mapping:
cleaned = re.sub(r"@.*?\.", "", mapping)
item["指标描述"]["指标映射"] = cleaned
elif isinstance(mapping, list):
first_elem = mapping[0]
if first_elem.startswith("(") and first_elem.endswith(")"):
del mapping[0]
# 过滤2
for item in result:
mapping = item.get("数据来源")
if mapping == "定额数量":
temp = item["指标描述"]["指标映射"]
temp = transform_text(temp, input_str="【定额】")
item["指标描述"]["指标映射"] = temp
elif mapping == "主材数量" or mapping == "主材单价":
temp = item["指标描述"]["指标映射"]
temp = transform_text(temp, input_str="【主材】")
item["指标描述"]["指标映射"] = temp
elif mapping == "设备数量":
temp = item["指标描述"]["指标映射"]
temp = transform_text(temp, input_str="【设备】")
item["指标描述"]["指标映射"] = temp
else:
if isinstance(item["指标描述"]["指标映射"], str):
temp = item["指标描述"]["指标映射"]
temp = replace_last_brackets(temp)
item["指标描述"]["指标映射"] = temp
# 保存为 JSON 文件
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
return ("结果已保存") # 返回 JSON 对象本身(非字符串)
except json.JSONDecodeError as e:
return {"error": f"JSON解析失败: {str(e)}"}
xml_content = read_xml_as_string('./data/主网架空线路造价分析指标.xml')
json_output = xml_to_json(xml_content)
parse_indicator_string_to_json(json_output, output_path= "./data/result6.27.json")
print("转换完毕!")