import chardet import xml.etree.ElementTree as ET import json def read_xml_as_string(file_path): # 先读取部分字节探测编码 with open(file_path, 'rb') as f: raw_data = f.read() result = chardet.detect(raw_data) encoding = result['encoding'] # 使用探测到的编码重新读取为字符串 return raw_data.decode(encoding) def parse_keyword(keyword, indicator_name): # 特殊处理:电压等级 # if indicator_name == "电压等级": # return {"映射规则": "1", "指标映射": [keyword]} # 处理范围表达式(包含"||"分隔符) if "||" in keyword: parts = keyword.split("||") table_rows = [] all_codes = [] for part in parts: if "@@" not in part: continue codes_str, value = part.split("@@", 1) code_ranges = codes_str.split("、") for code_range in code_ranges: # 处理连续编号(如YX5-67~69) if "~" in code_range: prefix, range_part = code_range.rsplit("-", 1) start_str, end_str = range_part.split("~") try: start = int(start_str) end = int(end_str) for num in range(start, end + 1): all_codes.append(f"{prefix}-{num}") except ValueError: all_codes.append(code_range) else: all_codes.append(code_range) table_rows.append(f"| {codes_str} | {value} |") rule_table = "| 资源识别规则 | 指标值 |\n|-------|-------|\n" + "\n".join(table_rows) return {"映射规则": rule_table, "指标映射": all_codes} # 处理数学公式(包含"/"和括号) if "/" in keyword and "(" in keyword and ")" in keyword: # 提取分子(括号前部分) molecule = keyword.split("/")[0].strip() # 提取分母(括号内部分) denominator_start = keyword.find("(") + 1 denominator_end = keyword.find(")") denominator_expr = keyword[denominator_start:denominator_end] # 分割分母中的加法项 denominator_items = [item.strip() for item in denominator_expr.split("+")] return {"映射规则": keyword, "指标映射": [molecule] + denominator_items} # 处理加法表达式 if "+" in keyword: items = [item.strip() for item in keyword.split("+")] return {"映射规则": keyword, "指标映射": items} # 默认处理(普通关键字) return {"映射规则": keyword, "指标映射": [keyword]} def xml_to_json(xml_content): root = ET.fromstring(xml_content) records = root.findall('.//records/record') result = [] for record in records: unit = record.get("单位") unit_type = record.get("单价类型") order = record.get("序号") extraction_method = record.get("提取方式") indicator_type = record.get("指标类型") index_extraction_scope = record.get("指标提取范围") data_sources = record.get("数据来源") indicator_name = record.get("指标名称") keyword = record.get("关键字") parsed = parse_keyword(keyword, indicator_name) if data_sources == "报表指标": result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": parsed["指标映射"], "映射规则": parsed["映射规则"] }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "主材单价": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "主材参数": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "主材数量": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "定额参数": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "定额数量": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "工程费用": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "指标库": result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": parsed["指标映射"], "映射规则": parsed["映射规则"] }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) elif data_sources == "项目划分费用": if index_extraction_scope is not None: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] temp2 = data_sources[-2:] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从项目划分【{index_extraction_scope}】下所有子孙项目划分中查找名称属于{temp0}的所有{temp2}", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) else: temp0 = parsed["指标映射"] temp1 = parsed["映射规则"] result.append({ "指标名称": indicator_name, "指标描述": { "指标映射": f"从【{data_sources}】中获取{temp0}的属性", "映射规则": f"{temp1}" }, "code": "", "单位": unit, "单价类型": unit_type, "序号": order, "提取方式": extraction_method }) return json.dumps(result, ensure_ascii=False, indent=4) def parse_indicator_string_to_json(indicator_str: str, output_path: str = "output.json"): try: # 解析为 JSON 对象 result = json.loads(indicator_str) # 过滤 for item in result: mapping = item.get("指标描述", {}).get("指标映射") if isinstance(mapping, str): if "【工程费用】" in mapping: cleaned = re.sub(r"@.*?\.", "", mapping) item["指标描述"]["指标映射"] = cleaned elif isinstance(mapping, list): first_elem = mapping[0] if first_elem.startswith("(") and first_elem.endswith(")"): del mapping[0] # 保存为 JSON 文件 with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) return ("结果已保存") # 返回 JSON 对象本身(非字符串) except json.JSONDecodeError as e: return {"error": f"JSON解析失败: {str(e)}"} xml_content = read_xml_as_string('./data/主网架空线路造价分析指标.xml') json_output = xml_to_json(xml_content) parse_indicator_string_to_json(json_output, output_path= "./data/result.json") print("转换完毕!")