Files
langchain_KG/kg_lab_6.13/xml_to_json.py
T
zoujiwen c284fcbdfd 上传文件至 kg_lab_6.13
6.25
1. 更新将全部关键信息转换
2. 设计对于所有指标映射的补全规则
2025-06-25 11:09:39 +08:00

383 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import chardet
import xml.etree.ElementTree as ET
import json
def read_xml_as_string(file_path):
# 先读取部分字节探测编码
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
# 使用探测到的编码重新读取为字符串
return raw_data.decode(encoding)
def parse_keyword(keyword, indicator_name):
# 特殊处理:电压等级
# if indicator_name == "电压等级":
# return {"映射规则": "1", "指标映射": [keyword]}
# 处理范围表达式(包含"||"分隔符)
if "||" in keyword:
parts = keyword.split("||")
table_rows = []
all_codes = []
for part in parts:
if "@@" not in part:
continue
codes_str, value = part.split("@@", 1)
code_ranges = codes_str.split("、")
for code_range in code_ranges:
# 处理连续编号(如YX5-67~69
if "~" in code_range:
prefix, range_part = code_range.rsplit("-", 1)
start_str, end_str = range_part.split("~")
try:
start = int(start_str)
end = int(end_str)
for num in range(start, end + 1):
all_codes.append(f"{prefix}-{num}")
except ValueError:
all_codes.append(code_range)
else:
all_codes.append(code_range)
table_rows.append(f"| {codes_str} | {value} |")
rule_table = "| 资源识别规则 | 指标值 |\n|-------|-------|\n" + "\n".join(table_rows)
return {"映射规则": rule_table, "指标映射": all_codes}
# 处理数学公式(包含"/"和括号)
if "/" in keyword and "(" in keyword and ")" in keyword:
# 提取分子(括号前部分)
molecule = keyword.split("/")[0].strip()
# 提取分母(括号内部分)
denominator_start = keyword.find("(") + 1
denominator_end = keyword.find(")")
denominator_expr = keyword[denominator_start:denominator_end]
# 分割分母中的加法项
denominator_items = [item.strip() for item in denominator_expr.split("+")]
return {"映射规则": keyword, "指标映射": [molecule] + denominator_items}
# 处理加法表达式
if "+" in keyword:
items = [item.strip() for item in keyword.split("+")]
return {"映射规则": keyword, "指标映射": items}
# 默认处理(普通关键字)
return {"映射规则": keyword, "指标映射": [keyword]}
def xml_to_json(xml_content):
root = ET.fromstring(xml_content)
records = root.findall('.//records/record')
result = []
for record in records:
unit = record.get("单位")
unit_type = record.get("单价类型")
order = record.get("序号")
extraction_method = record.get("提取方式")
indicator_type = record.get("指标类型")
index_extraction_scope = record.get("指标提取范围")
data_sources = record.get("数据来源")
indicator_name = record.get("指标名称")
keyword = record.get("关键字")
parsed = parse_keyword(keyword, indicator_name)
if data_sources == "报表指标":
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "主材单价":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "主材参数":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "主材数量":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "定额参数":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "定额数量":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "工程费用":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "指标库":
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": parsed["指标映射"],
"映射规则": parsed["映射规则"]
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
elif data_sources == "项目划分费用":
if index_extraction_scope is not None:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
temp2 = data_sources[-2:]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
else:
temp0 = parsed["指标映射"]
temp1 = parsed["映射规则"]
result.append({
"指标名称": indicator_name,
"指标描述": {
"指标映射": f"从【{data_sources}】中获取{temp0}的属性",
"映射规则": f"{temp1}"
},
"code": "",
"单位": unit,
"单价类型": unit_type,
"序号": order,
"提取方式": extraction_method
})
return json.dumps(result, ensure_ascii=False, indent=4)
def parse_indicator_string_to_json(indicator_str: str, output_path: str = "output.json"):
try:
# 解析为 JSON 对象
result = json.loads(indicator_str)
# 保存为 JSON 文件
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
return ("结果已保存") # 返回 JSON 对象本身(非字符串)
except json.JSONDecodeError as e:
return {"error": f"JSON解析失败: {str(e)}"}
xml_content = read_xml_as_string('./data/主网架空线路造价分析指标.xml')
json_output = xml_to_json(xml_content)
parse_indicator_string_to_json(json_output, output_path= "./data/result.json")
print("转换完毕!")