From 699ef24d453e51c64e51e4b7ab585d9332b0e779 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:51:43 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=20kg=5Fvisualization.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kg_visualization.py | 493 -------------------------------------------- 1 file changed, 493 deletions(-) delete mode 100644 kg_visualization.py diff --git a/kg_visualization.py b/kg_visualization.py deleted file mode 100644 index a41551d..0000000 --- a/kg_visualization.py +++ /dev/null @@ -1,493 +0,0 @@ -from flask import Flask, render_template, jsonify, request, send_file -from neo4j import GraphDatabase -from anytree import Node -import json -import configparser -import os -import pandas as pd -from io import BytesIO - -app = Flask(__name__, template_folder="templates") - - -# 读取配置文件 -def read_config(config_file="config.ini"): - """读取配置文件""" - config = configparser.ConfigParser() - config.read(config_file, encoding="utf-8") - return config - - -# 获取Neo4j连接配置 -config = read_config() -NEO4J_URI = config["neo4j"]["uri"] -NEO4J_USERNAME = config["neo4j"]["user"] -NEO4J_PASSWORD = config["neo4j"]["password"] - -driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) - - -class KnowledgeGraphService: - @staticmethod - def get_hierarchy(): - """获取层次结构数据""" - # 修改查询,获取所有节点和它们之间的关系,包括HAS_CHILD和USE关系 - query = """ - MATCH path = (root:EngineeringData)-[:HAS_CHILD|USE*]->(node) - UNWIND range(0, length(path)-1) AS i - WITH path, i - MATCH (parent)-[r:HAS_CHILD|USE]->(child) - WHERE parent = nodes(path)[i] AND child = nodes(path)[i+1] - RETURN DISTINCT - labels(parent)[0] AS parent_type, - id(parent) AS parent_id, - labels(child)[0] AS child_type, - id(child) AS child_id, - parent.name AS parent_name, - child.name AS child_name, - type(r) AS relationship_type, - child.amount as amount, - child.unit as unit, - child.unitPrice as unit_price, - child.totalPrice as total_price - """ - - with driver.session() as session: - result = session.run(query) - return [ - { - "parent_type": record["parent_type"], - "parent_id": record["parent_id"], - "child_type": record["child_type"], - "child_id": record["child_id"], - "parent_name": record["parent_name"], - "child_name": record["child_name"], - "relationship_type": record["relationship_type"], - "amount": record.get("amount"), - "unit": record.get("unit"), - "unit_price": record.get("unit_price"), - "total_price": record.get("total_price"), - } - for record in result - ] - - @staticmethod - def build_tree_structure(): - """构建树状结构""" - hierarchy_data = KnowledgeGraphService.get_hierarchy() - nodes = {} - - # 创建节点 - for item in hierarchy_data: - parent_id = item["parent_id"] - child_id = item["child_id"] - - # 创建父节点 - if parent_id not in nodes: - parent_label = ( - f"{item['parent_type']}: {item['parent_name']}" if item["parent_name"] else item["parent_type"] - ) - nodes[parent_id] = { - "id": parent_id, - "label": parent_label, - "type": item["parent_type"], - "name": item["parent_name"], - "children": [], - } - - # 创建子节点 - if child_id not in nodes: - child_label = ( - f"{item['child_type']}: {item['child_name']}" if item["child_name"] else item["child_type"] - ) - # 添加费用相关属性 - child_node = { - "id": child_id, - "label": child_label, - "type": item["child_type"], - "name": item["child_name"], - "children": [], - } - - # 如果有费用相关属性,添加到节点中 - if item.get("amount") is not None: - child_node["amount"] = item["amount"] - if item.get("unit") is not None: - child_node["unit"] = item["unit"] - if item.get("unit_price") is not None: - child_node["unitPrice"] = item["unit_price"] - if item.get("total_price") is not None: - child_node["totalPrice"] = item["total_price"] - - nodes[child_id] = child_node - - # 建立父子关系 - if child_id not in [child["id"] for child in nodes[parent_id]["children"]]: - nodes[parent_id]["children"].append(nodes[child_id]) - - # 找到根节点 - 明确查找EngineeringData类型的节点作为根节点 - roots = [] - for node_id, node in nodes.items(): - if node["type"] == "EngineeringData": - roots.append(node) - - # 如果没有找到EngineeringData节点,则使用原来的方法查找根节点 - if not roots: - all_children_ids = set() - for node in nodes.values(): - for child in node["children"]: - all_children_ids.add(child["id"]) - - roots = [node for node in nodes.values() if node["id"] not in all_children_ids] - - # 为每个节点添加费用预览子节点(如果有费用数据) - KnowledgeGraphService.add_cost_preview_nodes(nodes) - - return {"roots": roots, "all_nodes": nodes} - - @staticmethod - def add_cost_preview_nodes(nodes): - """为有费用数据的节点添加费用预览子节点""" - for node_id, node in nodes.items(): - # 检查子节点是否有费用数据 - cost_items = [] - for child in node.get("children", []): - if any(key in child for key in ["amount", "unitPrice", "totalPrice"]): - cost_items.append(child) - - # 如果有费用数据,创建费用预览节点 - if cost_items: - cost_preview_node = { - "id": f"cost_preview_{node_id}", - "label": "费用预览", - "type": "CostPreview", - "name": "费用预览", - "children": cost_items.copy(), # 复制费用项作为子节点 - } - node["children"].append(cost_preview_node) - - @staticmethod - def get_node_details(node_id): - """获取节点详细信息""" - query = """ - MATCH (n) - WHERE id(n) = $node_id - RETURN n, labels(n) as labels - """ - - with driver.session() as session: - result = session.run(query, node_id=int(node_id)) - record = result.single() - - if record: - node = record["n"] - labels = record["labels"] - - # 获取所有属性 - properties = dict(node) - - return {"id": node_id, "labels": labels, "properties": properties} - return None - - @staticmethod - def get_children_details(node_id): - """获取子节点详细信息""" - query = """ - MATCH (parent)-[:HAS_CHILD|USE]->(child) - WHERE id(parent) = $node_id - RETURN child, labels(child) as labels, id(child) as child_id - """ - - with driver.session() as session: - result = session.run(query, node_id=int(node_id)) - children = [] - - for record in result: - child = record["child"] - labels = record["labels"] - child_id = record["child_id"] - - children.append({"id": child_id, "labels": labels, "properties": dict(child)}) - - return children - - @staticmethod - def get_cost_preview_data(node_id): - """获取节点关联的费用数据(CostSet及其子节点)""" - # 首先获取节点类型,以便根据不同类型使用不同的查询 - type_query = """ - MATCH (node) - WHERE id(node) = $node_id - RETURN labels(node) as node_labels - """ - - with driver.session() as session: - type_result = session.run(type_query, node_id=int(node_id)) - type_record = type_result.single() - node_types = type_record["node_labels"] if type_record else [] - - # 根据节点类型选择适当的查询 - # 查询节点通过USE关系连接的CostSet - query = """ - MATCH (node)-[:USE]->(costSet:CostSet) - WHERE id(node) = $node_id - OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem) - RETURN costSet, id(costSet) as cost_set_id, costSet.name as cost_set_name, - costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels - """ - - result = session.run(query, node_id=int(node_id)) - - # 存储CostSet节点信息 - cost_sets = {} - cost_items = [] - - for record in result: - # 处理CostSet节点 - if record.get("cost_set_id") is not None and record.get("cost_set_id") not in cost_sets: - cost_set = record["costSet"] - cost_sets[record["cost_set_id"]] = { - "id": record["cost_set_id"], - "name": record.get("cost_set_name", "费用集"), - "properties": dict(cost_set), - } - - # 处理CostItem或MaterialandmachineCostItem节点 - if record.get("costItem") is not None: - cost_item = record["costItem"] - cost_item_labels = record.get("cost_item_labels", []) - - # 构建费用项属性 - properties = dict(cost_item) - - cost_items.append( - { - "id": record["cost_item_id"], - "labels": cost_item_labels, - "properties": properties, - "cost_set_id": record["cost_set_id"], - } - ) - - # 如果没有找到CostSet,检查是否有关联的CostItem - if not cost_sets and not cost_items: - # 检查节点是否直接关联CostItem - direct_cost_query = """ - MATCH (node)-[:HAS_CHILD]->(costItem) - WHERE id(node) = $node_id AND - (costItem:CostItem OR costItem:MaterialandmachineCostItem) - RETURN costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels - """ - - direct_cost_result = session.run(direct_cost_query, node_id=int(node_id)) - for record in direct_cost_result: - if record.get("costItem") is not None: - cost_item = record["costItem"] - cost_item_labels = record.get("cost_item_labels", []) - - cost_items.append( - {"id": record["cost_item_id"], "labels": cost_item_labels, "properties": dict(cost_item)} - ) - - # 如果仍然没有找到费用项,检查节点自身是否有费用相关属性 - if not cost_items: - direct_query = """ - MATCH (node) - WHERE id(node) = $node_id AND - (node.amount IS NOT NULL OR - node.unitPrice IS NOT NULL OR - node.totalPrice IS NOT NULL OR - node.cost IS NOT NULL) - RETURN node, id(node) as node_id, labels(node) as node_labels - """ - - direct_result = session.run(direct_query, node_id=int(node_id)) - for record in direct_result: - if record.get("node") is not None: - node = record["node"] - properties = dict(node) - - cost_items.append( - { - "id": record["node_id"], - "labels": record.get("node_labels", []), - "properties": properties, - } - ) - - return {"cost_sets": list(cost_sets.values()), "cost_items": cost_items} - - @staticmethod - def export_tree_to_excel(node_id=None): - """将整个工程导出为Excel格式""" - # 获取EngineeringData根节点 - eng_data = KnowledgeGraphService.get_engineering_data_node() - if not eng_data: - return None, "未找到工程根节点" - - # 获取根节点详情 - root_id = eng_data["id"] - root_name = eng_data["name"] - - # 递归获取所有子节点 - tree_data = KnowledgeGraphService.get_full_tree_structure(root_id) - - # 将树状结构转换为扁平结构,适合Excel展示 - flat_data = KnowledgeGraphService.flatten_tree_structure(tree_data) - - # 创建DataFrame - df = pd.DataFrame(flat_data) - - # 创建Excel文件 - output = BytesIO() - with pd.ExcelWriter(output, engine="openpyxl") as writer: - df.to_excel(writer, index=False) - - output.seek(0) - return output, root_name - - @staticmethod - def get_full_tree_structure(node_id): - """递归获取节点及其所有子节点的完整树状结构""" - # 获取节点详情 - node_details = KnowledgeGraphService.get_node_details(node_id) - if not node_details: - return None - - # 获取子节点 - children = KnowledgeGraphService.get_children_details(node_id) - - # 构建树节点 - tree_node = { - "id": node_id, - "type": node_details.get("labels", [""])[0] if node_details.get("labels") else "", - "name": node_details.get("properties", {}).get("name", ""), - "properties": node_details.get("properties", {}), - "children": [], - } - - # 递归处理子节点 - for child in children: - # 跳过费用预览相关节点 - if (child.get("labels") and any("Cost" in label for label in child.get("labels"))) or ( - child.get("properties", {}).get("name", "").find("费用预览") != -1 - ): - continue - - child_tree = KnowledgeGraphService.get_full_tree_structure(child["id"]) - if child_tree: - tree_node["children"].append(child_tree) - - return tree_node - - @staticmethod - def flatten_tree_structure(tree_node, level=0, parent_path=""): - """将树状结构转换为扁平结构,适合Excel展示""" - if not tree_node: - return [] - - # 当前节点的路径 - current_path = f"{parent_path}/{tree_node['name']}" if parent_path else tree_node["name"] - - # 当前节点的数据 - node_data = { - "层级": level, - "路径": current_path, - "节点类型": tree_node["type"], - "节点名称": tree_node["name"], - } - - # 添加其他属性 - properties = tree_node.get("properties", {}) - for key, value in properties.items(): - if key not in ["name"]: # 排除已包含的属性 - node_data[key] = value - - # 当前节点及其所有子节点的扁平数据 - flat_data = [node_data] - - # 递归处理子节点 - for child in tree_node.get("children", []): - flat_data.extend(KnowledgeGraphService.flatten_tree_structure(child, level + 1, current_path)) - - return flat_data - - @staticmethod - def get_engineering_data_node(): - """获取EngineeringData类型的根节点""" - query = """ - MATCH (n:EngineeringData) - RETURN id(n) as node_id, n.name as name - LIMIT 1 - """ - - with driver.session() as session: - result = session.run(query) - record = result.single() - - if record: - return {"id": record["node_id"], "name": record["name"] or "工程"} - return None - - -@app.route("/") -def index(): - return render_template("html_template.html") - - -@app.route("/api/tree") -def get_tree(): - """获取树状结构API""" - try: - tree_data = KnowledgeGraphService.build_tree_structure() - return jsonify(tree_data) - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/node/") -def get_node_info(node_id): - """获取节点信息API""" - try: - node_details = KnowledgeGraphService.get_node_details(node_id) - children_details = KnowledgeGraphService.get_children_details(node_id) - - if node_details: - return jsonify({"node": node_details, "children": children_details}) - else: - return jsonify({"error": "Node not found"}), 404 - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/node//cost") -def get_node_cost_info(node_id): - """获取节点费用信息API""" - try: - cost_data = KnowledgeGraphService.get_cost_preview_data(node_id) - return jsonify(cost_data) - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -@app.route("/api/export/") -def export_node_tree(node_id): - """导出整个工程知识图谱为Excel""" - try: - # 忽略传入的node_id参数,总是导出整个工程 - excel_file, filename = KnowledgeGraphService.export_tree_to_excel(None) - if excel_file: - return send_file( - excel_file, - mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - as_attachment=True, - download_name=f"{filename}.xlsx", - ) - else: - return jsonify({"error": "无法生成Excel文件"}), 500 - except Exception as e: - return jsonify({"error": str(e)}), 500 - - -if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=5000) From 93cd1482f53e3873250c9deaac4af05bc6d44b8a Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:51:55 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=20templates/html=5Ftempl?= =?UTF-8?q?ate.html?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- templates/html_template.html | 1982 ---------------------------------- 1 file changed, 1982 deletions(-) delete mode 100644 templates/html_template.html diff --git a/templates/html_template.html b/templates/html_template.html deleted file mode 100644 index 101b17e..0000000 --- a/templates/html_template.html +++ /dev/null @@ -1,1982 +0,0 @@ - - - - - - 知识图谱可视化 - - - - -
- -
-
- - 知识图谱目录 -
- -
-
- - -
- - -
- -
- -

欢迎使用知识图谱可视化系统

-

点击左侧树状结构中的任意节点,查看详细信息

-
- - - -
-
- - - - - \ No newline at end of file From ba211bf18c99781385d64aa8b5392ae4b2892afa Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:52:57 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kg_visualization.py | 493 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 493 insertions(+) create mode 100644 kg_visualization.py diff --git a/kg_visualization.py b/kg_visualization.py new file mode 100644 index 0000000..a41551d --- /dev/null +++ b/kg_visualization.py @@ -0,0 +1,493 @@ +from flask import Flask, render_template, jsonify, request, send_file +from neo4j import GraphDatabase +from anytree import Node +import json +import configparser +import os +import pandas as pd +from io import BytesIO + +app = Flask(__name__, template_folder="templates") + + +# 读取配置文件 +def read_config(config_file="config.ini"): + """读取配置文件""" + config = configparser.ConfigParser() + config.read(config_file, encoding="utf-8") + return config + + +# 获取Neo4j连接配置 +config = read_config() +NEO4J_URI = config["neo4j"]["uri"] +NEO4J_USERNAME = config["neo4j"]["user"] +NEO4J_PASSWORD = config["neo4j"]["password"] + +driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) + + +class KnowledgeGraphService: + @staticmethod + def get_hierarchy(): + """获取层次结构数据""" + # 修改查询,获取所有节点和它们之间的关系,包括HAS_CHILD和USE关系 + query = """ + MATCH path = (root:EngineeringData)-[:HAS_CHILD|USE*]->(node) + UNWIND range(0, length(path)-1) AS i + WITH path, i + MATCH (parent)-[r:HAS_CHILD|USE]->(child) + WHERE parent = nodes(path)[i] AND child = nodes(path)[i+1] + RETURN DISTINCT + labels(parent)[0] AS parent_type, + id(parent) AS parent_id, + labels(child)[0] AS child_type, + id(child) AS child_id, + parent.name AS parent_name, + child.name AS child_name, + type(r) AS relationship_type, + child.amount as amount, + child.unit as unit, + child.unitPrice as unit_price, + child.totalPrice as total_price + """ + + with driver.session() as session: + result = session.run(query) + return [ + { + "parent_type": record["parent_type"], + "parent_id": record["parent_id"], + "child_type": record["child_type"], + "child_id": record["child_id"], + "parent_name": record["parent_name"], + "child_name": record["child_name"], + "relationship_type": record["relationship_type"], + "amount": record.get("amount"), + "unit": record.get("unit"), + "unit_price": record.get("unit_price"), + "total_price": record.get("total_price"), + } + for record in result + ] + + @staticmethod + def build_tree_structure(): + """构建树状结构""" + hierarchy_data = KnowledgeGraphService.get_hierarchy() + nodes = {} + + # 创建节点 + for item in hierarchy_data: + parent_id = item["parent_id"] + child_id = item["child_id"] + + # 创建父节点 + if parent_id not in nodes: + parent_label = ( + f"{item['parent_type']}: {item['parent_name']}" if item["parent_name"] else item["parent_type"] + ) + nodes[parent_id] = { + "id": parent_id, + "label": parent_label, + "type": item["parent_type"], + "name": item["parent_name"], + "children": [], + } + + # 创建子节点 + if child_id not in nodes: + child_label = ( + f"{item['child_type']}: {item['child_name']}" if item["child_name"] else item["child_type"] + ) + # 添加费用相关属性 + child_node = { + "id": child_id, + "label": child_label, + "type": item["child_type"], + "name": item["child_name"], + "children": [], + } + + # 如果有费用相关属性,添加到节点中 + if item.get("amount") is not None: + child_node["amount"] = item["amount"] + if item.get("unit") is not None: + child_node["unit"] = item["unit"] + if item.get("unit_price") is not None: + child_node["unitPrice"] = item["unit_price"] + if item.get("total_price") is not None: + child_node["totalPrice"] = item["total_price"] + + nodes[child_id] = child_node + + # 建立父子关系 + if child_id not in [child["id"] for child in nodes[parent_id]["children"]]: + nodes[parent_id]["children"].append(nodes[child_id]) + + # 找到根节点 - 明确查找EngineeringData类型的节点作为根节点 + roots = [] + for node_id, node in nodes.items(): + if node["type"] == "EngineeringData": + roots.append(node) + + # 如果没有找到EngineeringData节点,则使用原来的方法查找根节点 + if not roots: + all_children_ids = set() + for node in nodes.values(): + for child in node["children"]: + all_children_ids.add(child["id"]) + + roots = [node for node in nodes.values() if node["id"] not in all_children_ids] + + # 为每个节点添加费用预览子节点(如果有费用数据) + KnowledgeGraphService.add_cost_preview_nodes(nodes) + + return {"roots": roots, "all_nodes": nodes} + + @staticmethod + def add_cost_preview_nodes(nodes): + """为有费用数据的节点添加费用预览子节点""" + for node_id, node in nodes.items(): + # 检查子节点是否有费用数据 + cost_items = [] + for child in node.get("children", []): + if any(key in child for key in ["amount", "unitPrice", "totalPrice"]): + cost_items.append(child) + + # 如果有费用数据,创建费用预览节点 + if cost_items: + cost_preview_node = { + "id": f"cost_preview_{node_id}", + "label": "费用预览", + "type": "CostPreview", + "name": "费用预览", + "children": cost_items.copy(), # 复制费用项作为子节点 + } + node["children"].append(cost_preview_node) + + @staticmethod + def get_node_details(node_id): + """获取节点详细信息""" + query = """ + MATCH (n) + WHERE id(n) = $node_id + RETURN n, labels(n) as labels + """ + + with driver.session() as session: + result = session.run(query, node_id=int(node_id)) + record = result.single() + + if record: + node = record["n"] + labels = record["labels"] + + # 获取所有属性 + properties = dict(node) + + return {"id": node_id, "labels": labels, "properties": properties} + return None + + @staticmethod + def get_children_details(node_id): + """获取子节点详细信息""" + query = """ + MATCH (parent)-[:HAS_CHILD|USE]->(child) + WHERE id(parent) = $node_id + RETURN child, labels(child) as labels, id(child) as child_id + """ + + with driver.session() as session: + result = session.run(query, node_id=int(node_id)) + children = [] + + for record in result: + child = record["child"] + labels = record["labels"] + child_id = record["child_id"] + + children.append({"id": child_id, "labels": labels, "properties": dict(child)}) + + return children + + @staticmethod + def get_cost_preview_data(node_id): + """获取节点关联的费用数据(CostSet及其子节点)""" + # 首先获取节点类型,以便根据不同类型使用不同的查询 + type_query = """ + MATCH (node) + WHERE id(node) = $node_id + RETURN labels(node) as node_labels + """ + + with driver.session() as session: + type_result = session.run(type_query, node_id=int(node_id)) + type_record = type_result.single() + node_types = type_record["node_labels"] if type_record else [] + + # 根据节点类型选择适当的查询 + # 查询节点通过USE关系连接的CostSet + query = """ + MATCH (node)-[:USE]->(costSet:CostSet) + WHERE id(node) = $node_id + OPTIONAL MATCH (costSet)-[:HAS_CHILD]->(costItem) + RETURN costSet, id(costSet) as cost_set_id, costSet.name as cost_set_name, + costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels + """ + + result = session.run(query, node_id=int(node_id)) + + # 存储CostSet节点信息 + cost_sets = {} + cost_items = [] + + for record in result: + # 处理CostSet节点 + if record.get("cost_set_id") is not None and record.get("cost_set_id") not in cost_sets: + cost_set = record["costSet"] + cost_sets[record["cost_set_id"]] = { + "id": record["cost_set_id"], + "name": record.get("cost_set_name", "费用集"), + "properties": dict(cost_set), + } + + # 处理CostItem或MaterialandmachineCostItem节点 + if record.get("costItem") is not None: + cost_item = record["costItem"] + cost_item_labels = record.get("cost_item_labels", []) + + # 构建费用项属性 + properties = dict(cost_item) + + cost_items.append( + { + "id": record["cost_item_id"], + "labels": cost_item_labels, + "properties": properties, + "cost_set_id": record["cost_set_id"], + } + ) + + # 如果没有找到CostSet,检查是否有关联的CostItem + if not cost_sets and not cost_items: + # 检查节点是否直接关联CostItem + direct_cost_query = """ + MATCH (node)-[:HAS_CHILD]->(costItem) + WHERE id(node) = $node_id AND + (costItem:CostItem OR costItem:MaterialandmachineCostItem) + RETURN costItem, id(costItem) as cost_item_id, labels(costItem) as cost_item_labels + """ + + direct_cost_result = session.run(direct_cost_query, node_id=int(node_id)) + for record in direct_cost_result: + if record.get("costItem") is not None: + cost_item = record["costItem"] + cost_item_labels = record.get("cost_item_labels", []) + + cost_items.append( + {"id": record["cost_item_id"], "labels": cost_item_labels, "properties": dict(cost_item)} + ) + + # 如果仍然没有找到费用项,检查节点自身是否有费用相关属性 + if not cost_items: + direct_query = """ + MATCH (node) + WHERE id(node) = $node_id AND + (node.amount IS NOT NULL OR + node.unitPrice IS NOT NULL OR + node.totalPrice IS NOT NULL OR + node.cost IS NOT NULL) + RETURN node, id(node) as node_id, labels(node) as node_labels + """ + + direct_result = session.run(direct_query, node_id=int(node_id)) + for record in direct_result: + if record.get("node") is not None: + node = record["node"] + properties = dict(node) + + cost_items.append( + { + "id": record["node_id"], + "labels": record.get("node_labels", []), + "properties": properties, + } + ) + + return {"cost_sets": list(cost_sets.values()), "cost_items": cost_items} + + @staticmethod + def export_tree_to_excel(node_id=None): + """将整个工程导出为Excel格式""" + # 获取EngineeringData根节点 + eng_data = KnowledgeGraphService.get_engineering_data_node() + if not eng_data: + return None, "未找到工程根节点" + + # 获取根节点详情 + root_id = eng_data["id"] + root_name = eng_data["name"] + + # 递归获取所有子节点 + tree_data = KnowledgeGraphService.get_full_tree_structure(root_id) + + # 将树状结构转换为扁平结构,适合Excel展示 + flat_data = KnowledgeGraphService.flatten_tree_structure(tree_data) + + # 创建DataFrame + df = pd.DataFrame(flat_data) + + # 创建Excel文件 + output = BytesIO() + with pd.ExcelWriter(output, engine="openpyxl") as writer: + df.to_excel(writer, index=False) + + output.seek(0) + return output, root_name + + @staticmethod + def get_full_tree_structure(node_id): + """递归获取节点及其所有子节点的完整树状结构""" + # 获取节点详情 + node_details = KnowledgeGraphService.get_node_details(node_id) + if not node_details: + return None + + # 获取子节点 + children = KnowledgeGraphService.get_children_details(node_id) + + # 构建树节点 + tree_node = { + "id": node_id, + "type": node_details.get("labels", [""])[0] if node_details.get("labels") else "", + "name": node_details.get("properties", {}).get("name", ""), + "properties": node_details.get("properties", {}), + "children": [], + } + + # 递归处理子节点 + for child in children: + # 跳过费用预览相关节点 + if (child.get("labels") and any("Cost" in label for label in child.get("labels"))) or ( + child.get("properties", {}).get("name", "").find("费用预览") != -1 + ): + continue + + child_tree = KnowledgeGraphService.get_full_tree_structure(child["id"]) + if child_tree: + tree_node["children"].append(child_tree) + + return tree_node + + @staticmethod + def flatten_tree_structure(tree_node, level=0, parent_path=""): + """将树状结构转换为扁平结构,适合Excel展示""" + if not tree_node: + return [] + + # 当前节点的路径 + current_path = f"{parent_path}/{tree_node['name']}" if parent_path else tree_node["name"] + + # 当前节点的数据 + node_data = { + "层级": level, + "路径": current_path, + "节点类型": tree_node["type"], + "节点名称": tree_node["name"], + } + + # 添加其他属性 + properties = tree_node.get("properties", {}) + for key, value in properties.items(): + if key not in ["name"]: # 排除已包含的属性 + node_data[key] = value + + # 当前节点及其所有子节点的扁平数据 + flat_data = [node_data] + + # 递归处理子节点 + for child in tree_node.get("children", []): + flat_data.extend(KnowledgeGraphService.flatten_tree_structure(child, level + 1, current_path)) + + return flat_data + + @staticmethod + def get_engineering_data_node(): + """获取EngineeringData类型的根节点""" + query = """ + MATCH (n:EngineeringData) + RETURN id(n) as node_id, n.name as name + LIMIT 1 + """ + + with driver.session() as session: + result = session.run(query) + record = result.single() + + if record: + return {"id": record["node_id"], "name": record["name"] or "工程"} + return None + + +@app.route("/") +def index(): + return render_template("html_template.html") + + +@app.route("/api/tree") +def get_tree(): + """获取树状结构API""" + try: + tree_data = KnowledgeGraphService.build_tree_structure() + return jsonify(tree_data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/node/") +def get_node_info(node_id): + """获取节点信息API""" + try: + node_details = KnowledgeGraphService.get_node_details(node_id) + children_details = KnowledgeGraphService.get_children_details(node_id) + + if node_details: + return jsonify({"node": node_details, "children": children_details}) + else: + return jsonify({"error": "Node not found"}), 404 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/node//cost") +def get_node_cost_info(node_id): + """获取节点费用信息API""" + try: + cost_data = KnowledgeGraphService.get_cost_preview_data(node_id) + return jsonify(cost_data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/export/") +def export_node_tree(node_id): + """导出整个工程知识图谱为Excel""" + try: + # 忽略传入的node_id参数,总是导出整个工程 + excel_file, filename = KnowledgeGraphService.export_tree_to_excel(None) + if excel_file: + return send_file( + excel_file, + mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + as_attachment=True, + download_name=f"{filename}.xlsx", + ) + else: + return jsonify({"error": "无法生成Excel文件"}), 500 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + app.run(debug=True, host="0.0.0.0", port=5000) From dc747279432847cf5eb67193795a159dfdfbccaf Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:53:22 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20templates?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- templates/config.ini | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 templates/config.ini diff --git a/templates/config.ini b/templates/config.ini new file mode 100644 index 0000000..e65d727 --- /dev/null +++ b/templates/config.ini @@ -0,0 +1,4 @@ +[neo4j] +uri = bolt://10.1.6.34:7687 +user = neo4j +password = password \ No newline at end of file From 29772c7ad776b27ec854a79b0892addd70dd4fd0 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:53:50 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20templates/html=5Ftempl?= =?UTF-8?q?ate.html?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- templates/html_template.html | 1982 ++++++++++++++++++++++++++++++++++ 1 file changed, 1982 insertions(+) create mode 100644 templates/html_template.html diff --git a/templates/html_template.html b/templates/html_template.html new file mode 100644 index 0000000..101b17e --- /dev/null +++ b/templates/html_template.html @@ -0,0 +1,1982 @@ + + + + + + 知识图谱可视化 + + + + +
+ +
+
+ + 知识图谱目录 +
+ +
+
+ + +
+ + +
+ +
+ +

欢迎使用知识图谱可视化系统

+

点击左侧树状结构中的任意节点,查看详细信息

+
+ + + +
+
+ + + + + \ No newline at end of file From b4fe96532df44ddf197a4b43c8c3a055a40c7da7 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:54:07 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=20templates/config.ini?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- templates/config.ini | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 templates/config.ini diff --git a/templates/config.ini b/templates/config.ini deleted file mode 100644 index e65d727..0000000 --- a/templates/config.ini +++ /dev/null @@ -1,4 +0,0 @@ -[neo4j] -uri = bolt://10.1.6.34:7687 -user = neo4j -password = password \ No newline at end of file From 377f7012f0e010891dd0de1b364f63e8f19c725f Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:54:25 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=20equipment=5Fcalculatio?= =?UTF-8?q?n/main.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- equipment_calculation/main.py | 315 ---------------------------------- 1 file changed, 315 deletions(-) delete mode 100644 equipment_calculation/main.py diff --git a/equipment_calculation/main.py b/equipment_calculation/main.py deleted file mode 100644 index 831f60f..0000000 --- a/equipment_calculation/main.py +++ /dev/null @@ -1,315 +0,0 @@ -""" -第二步:计算bcl结果 -""" - -import os -import argparse -import re -import json -from typing import Dict, List, Any, Optional, Tuple -from equipment_calculation.software_types import ( - get_software_type, - SoftwareCategory, - EngineeringType, - ALL_SOFTWARE_TYPES, -) -from equipment_calculation.software_calculators import get_calculator - - -def parse_arguments(): - """解析命令行参数""" - parser = argparse.ArgumentParser(description="工程量取费和人材机合价计算程序") - - # 软件类型参数 - parser.add_argument( - "--category", choices=["主网", "配网", "技改"], default="主网", help="软件类别(主网/配网/技改)" - ) - - parser.add_argument("--engineering-type", choices=["预算", "清单"], default="清单", help="工程类型(预算/清单)") - - # 计算类型参数 - parser.add_argument( - "--calculate", - choices=["all", "quantity", "resource"], - default="quantity", - help="计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)", - ) - - # 项目参数 - parser.add_argument("--project", help="项目名称,如果不指定则处理所有项目") - - parser.add_argument("--adjustment-type", default="调差", help="调差类型,默认为'调差'") - - # 输入文件参数 - parser.add_argument( - "--input-file", - default="测试案例/主网清单变电.json", - help="输入JSON文件路径,如果不指定则使用默认路径", - ) - - # 添加输入和输出文件夹参数 - parser.add_argument("--input-folder", help="输入文件夹路径,包含要处理的JSON文件") - parser.add_argument("--output-folder", default="计算结果", help="输出文件夹路径,用于保存计算结果") - - return parser.parse_args() - - -def parse_filename(filename: str) -> Tuple[str, str]: - """ - 从文件名中解析软件类别和工程类型(备用方法) - - :param filename: JSON文件名,例如"主网预算变电.json" - :return: (category, engineering_type) 元组,例如 ("主网", "预算") - """ - # 移除扩展名 - basename = os.path.splitext(filename)[0] - - # 查找类别(主网/配网/技改) - category = None - for cat in ["主网", "配网", "技改"]: - if cat in basename: - category = cat - break - - # 查找工程类型(预算/清单) - engineering_type = None - for eng_type in ["预算", "清单"]: - if eng_type in basename: - engineering_type = eng_type - break - - # 如果未找到,使用默认值 - if not category: - print(f"警告: 无法从文件名 '{filename}' 中解析软件类别,使用默认值 '主网'") - category = "主网" - - if not engineering_type: - print(f"警告: 无法从文件名 '{filename}' 中解析工程类型,使用默认值 '清单'") - engineering_type = "清单" - - return category, engineering_type - - -def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]: - """ - 从JSON文件内容中解析软件类别和工程类型 - - :param json_file_path: JSON文件路径 - :return: (category, engineering_type) 元组,如果解析失败则返回 (None, None) - """ - try: - with open(json_file_path, "r", encoding="utf-8") as f: - data = json.load(f) - - # 定义阶段类型映射表 - budget_types = ["概预算", "定额", "估算", "概算"] - list_types = ["清单", "结算", "招标控制价", "招投标工程"] - - # 从division字段获取软件名称和阶段类型 - if "division" in data: - division = data["division"] - print(f"找到division字段: {division}") - - # 使用-分割division字段 - parts = division.split("-") - if len(parts) >= 2: - category = parts[0].strip() - stage_type = parts[1].strip() - - # 验证软件类别 - if category not in ["主网", "配网", "技改"]: - print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") - category = "主网" - - # 映射阶段类型 - if any(budget_type in stage_type for budget_type in budget_types): - engineering_type = "预算" - elif any(list_type in stage_type for list_type in list_types): - engineering_type = "清单" - else: - print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'") - engineering_type = "清单" - - print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})") - return category, engineering_type - else: - print(f"警告: division字段 '{division}' 格式不正确,无法分割") - else: - print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析") - - # 作为备选,尝试从basicData中获取 - if "basicData" in data: - basic_data = data["basicData"] - category = basic_data.get("软件名称") - engineering_type = basic_data.get("阶段类型") - - # 验证解析结果 - if category and engineering_type: - # 确保category是有效值 - if category not in ["主网", "配网", "技改"]: - print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") - category = "主网" - - # 确保engineering_type是有效值 - if engineering_type not in ["预算", "清单"]: - print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'") - engineering_type = "清单" - - print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}") - return category, engineering_type - else: - print(f"警告: basicData中未找到软件名称或阶段类型") - else: - print(f"警告: JSON文件中未找到basicData部分") - - return None, None - - except Exception as e: - print(f"解析JSON文件内容时出错: {str(e)}") - return None, None - - -def process_json_file( - input_file: str, - base_output_dir: str = "计算结果", - category: Optional[str] = None, - engineering_type: Optional[str] = None, - project: Optional[str] = None, - adjustment_type: str = "调差", -) -> bool: - """ - 处理单个JSON文件 - - :param input_file: 输入JSON文件路径 - :param base_output_dir: 基础输出目录路径,实际结果会保存在此目录下的子文件夹中 - :param category: 软件类别,如果为None则从JSON内容中解析 - :param engineering_type: 工程类型,如果为None则从JSON内容中解析 - :param project: 项目名称,如果为None则处理所有项目 - :param adjustment_type: 调差类型 - :return: 处理是否成功 - """ - try: - # 获取文件名(不含扩展名)作为输出子文件夹的名称 - filename = os.path.basename(input_file) - file_basename = os.path.splitext(filename)[0] - output_dir = os.path.join(base_output_dir, file_basename) - - # 创建输出子文件夹 - os.makedirs(output_dir, exist_ok=True) - print(f"创建输出目录: {output_dir}") - - # 如果未指定category或engineering_type,从JSON内容中解析 - if category is None or engineering_type is None: - # 首先尝试从JSON内容中解析 - json_category, json_engineering_type = parse_json_content(input_file) - - if category is None: - if json_category: - category = json_category - else: - # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) - print("从JSON内容解析软件名称失败,尝试从文件名解析...") - parsed_category, _ = parse_filename(filename) - category = parsed_category - - if engineering_type is None: - if json_engineering_type: - engineering_type = json_engineering_type - else: - # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) - print("从JSON内容解析阶段类型失败,尝试从文件名解析...") - _, parsed_engineering_type = parse_filename(filename) - engineering_type = parsed_engineering_type - - print(f"处理文件: {input_file}") - print(f" 软件类别: {category}") - print(f" 工程类型: {engineering_type}") - - # 获取软件类型 - software_type = get_software_type(category, engineering_type) - print(f" 使用软件类型: {software_type.name}") - - # 获取计算器 - calculator = get_calculator(software_type) - if not calculator: - print(f" 错误: 未找到软件类型 {software_type.name} 的计算器") - return False - - # 设置输出目录 - calculator.set_output_dir(output_dir) - - # 执行计算 - print(" 开始计算工程量取费表...") - calculator.calculate_quantity_fee_tables(json_file_path=input_file, project_name=project) - - print(" 开始计算人材机合价...") - calculator.calculate_resource_fee_tables(json_file_path=input_file, project_name=project) - - print(f" 处理完成: {input_file}") - print(f" 结果保存在: {output_dir}") - return True - - except Exception as e: - print(f" 处理文件 {input_file} 时出错: {str(e)}") - return False - - -def process_BCL_calculate(input_folder: str, output_folder: str) -> List[Tuple[str, bool]]: - """ - 处理指定文件夹中的所有JSON文件 - - :param input_folder: 输入文件夹路径,包含要处理的JSON文件 - :param output_folder: 输出文件夹路径,用于保存计算结果 - :return: 处理结果列表,格式为 [(文件路径, 是否成功), ...] - """ - # 确保基础输出目录存在 - os.makedirs(output_folder, exist_ok=True) - - # 查找所有JSON文件 - json_files = [] - for file in os.listdir(input_folder): - if file.lower().endswith(".json"): - json_files.append(os.path.join(input_folder, file)) - - if not json_files: - print(f"警告: 在目录 {input_folder} 中没有找到JSON文件") - return [] - - # 处理每个JSON文件 - results = [] - for input_file in json_files: - success = process_json_file( - input_file=input_file, - base_output_dir=output_folder, # 传递基础输出目录 - category=None, # 从JSON内容中解析 - engineering_type=None, # 从JSON内容中解析 - project=None, # 处理所有项目 - adjustment_type="调差", - ) - - results.append((input_file, success)) - - return results - - -def main(): - """程序入口""" - - input_folder = "project2json/outputs/json" - output_folder = "project2json/outputs/bcl_results" - - results = process_BCL_calculate(input_folder, output_folder) - - # 显示处理结果 - success_count = sum(1 for _, success in results if success) - print(f"\n处理完成: 成功 {success_count}/{len(results)} 个文件") - - if success_count < len(results): - print("处理失败的文件:") - for file_path, success in results: - if not success: - print(f" - {os.path.basename(file_path)}") - - -if __name__ == "__main__": - main() From e0fc238765ca66e2a258621f206dac281b41bae6 Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:54:33 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=20equipment=5Fcalculatio?= =?UTF-8?q?n/software=5Fcalculators.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- equipment_calculation/software_calculators.py | 703 ------------------ 1 file changed, 703 deletions(-) delete mode 100644 equipment_calculation/software_calculators.py diff --git a/equipment_calculation/software_calculators.py b/equipment_calculation/software_calculators.py deleted file mode 100644 index d2fef44..0000000 --- a/equipment_calculation/software_calculators.py +++ /dev/null @@ -1,703 +0,0 @@ -from equipment_calculation.calculator_base import CalculatorBase -from equipment_calculation.software_types import ( - SoftwareType, - MAIN_GRID_BUDGET, - MAIN_GRID_BILL, - DISTRIBUTION_BUDGET, - DISTRIBUTION_BILL, - TECHNICAL_RENOVATION_BUDGET, - TECHNICAL_RENOVATION_BILL, -) -from equipment_calculation.calculation_strategy import CalculationStrategy, DefaultCalculationStrategy -import json -import os -from typing import Any, Dict, Optional, Set -import sys -import re - -# 添加一个全局变量和一个缓存字典来存储清单数量 -_BILL_QUANTITY = 1.0 -bill_quantity_cache = {} - - -# 为每种软件类型定义特定的计算策略 -class MainGridBudgetCalculationStrategy(DefaultCalculationStrategy): - """主网预算计算策略""" - - def preprocess_quantity_fee_node(self, node: dict) -> None: - """ - 工程量取费表节点预处理规则: - - 类型为"1"或"主材"时,供货方映射,主材属性补全 - - 类型为"5"或"设备"时,供货方映射,设备类型映射 - """ - node_type = str(node.get("类型", "")) - # 定额处理 - if node_type == "0" or node_type == "定额": - supplier = str(node.get("费用类型", "")) - if supplier == "1": - node["费用类型"] = "不取费" - elif supplier == "0": - node["费用类型"] = "取费" - - # 处理特征段 - seg_val = node.get("特征段", "") - if isinstance(seg_val, str): - num = re.search(r"\d+", seg_val) - node["特征段"] = num.group() if num else "" - - # 主材处理 - if node_type == "1" or node_type == "主材": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 补全主材属性 - for key in ["拆分", "设备性材料"]: - if key not in node: - node[key] = 0 - # 设备处理 - if node_type == "5" or node_type == "设备": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 设备类型映射 - if str(node.get("设备类型", "")) == "0": - node["设备类型"] = "普通设备" - - -class MainGridBillCalculationStrategy(DefaultCalculationStrategy): - """主网清单计算策略""" - - def __init__(self): - super().__init__() - self.bill_quantity = 1.0 # 默认值 - - def set_bill_quantity(self, quantity): - """设置清单数量""" - try: - self.bill_quantity = float(quantity) - print(f"设置计算策略的清单数量: {self.bill_quantity}") - except (TypeError, ValueError): - print(f"无法将 {quantity} 转换为浮点数") - - def calculate_external_variable(self, var_name: str, context: Any) -> float: - """计算表外变量,并乘以父级清单的数量""" - # 使用默认实现计算表外变量 - from equipment_calculation.bcl_utils import calculator - - result = calculator.calculate(var_name, context) - - # 获取清单数量 - 首先从全局变量获取 - global _BILL_QUANTITY # 在模块级别定义一个全局变量 - parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 - - print(f"\n===== 获取清单数量 =====") - print(f"从全局变量获取清单数量: {parent_quantity}") - - # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 - if parent_quantity == 1.0 and hasattr(context, "__dict__"): - module = sys.modules.get("quantity_fee_calculator") - if module and hasattr(module, "bill_quantity_cache"): - bill_cache = getattr(module, "bill_quantity_cache") - bill_id = None - - # 尝试从上下文中获取bill_id - if hasattr(context, "bill_id"): - bill_id = context.bill_id - elif hasattr(context, "datasource") and context.datasource: - for item in context.datasource: - if hasattr(item, "object") and hasattr(item.object, "bill_id"): - bill_id = item.object.bill_id - break - - # 如果找到bill_id并且在缓存中,使用缓存的数量 - if bill_id and bill_id in bill_cache: - parent_quantity = bill_cache[bill_id] - print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") - - # 设置一个临时环境变量作为最后的手段 - if parent_quantity == 1.0: - import os - - env_quantity = os.environ.get("BILL_QUANTITY") - if env_quantity: - try: - parent_quantity = float(env_quantity) - print(f"从环境变量获取清单数量: {parent_quantity}") - except: - pass - - print(f"===== 获取清单数量结束 =====\n") - - # 将结果乘以父级清单的数量 - final_result = float(result) * parent_quantity if result is not None else 0.0 - print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") - return final_result - - def preprocess_quantity_fee_node(self, node: dict) -> None: - """ - 工程量取费表节点预处理规则: - - 类型为"1"或"主材"时,供货方映射,主材属性补全 - - 类型为"5"或"设备"时,供货方映射,设备类型映射 - """ - node_type = str(node.get("类型", "")) - # 主材处理 - if node_type == "1" or node_type == "主材": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 补全主材属性 - for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: - if key not in node: - node[key] = 0 - # 设备处理 - if node_type == "5" or node_type == "设备": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 设备类型映射 - if str(node.get("设备类型", "")) == "0": - node["设备类型"] = "普通设备" - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """将取费表中的'综合单价'替换为'合价'""" - try: - # 读取输出文件 - with open(output_file, "r", encoding="utf-8") as f: - data = json.load(f) - - # 递归查找并替换"综合单价"为"合价" - def replace_key(obj): - if isinstance(obj, dict): - new_obj = {} - for key, value in obj.items(): - # 替换键名 - new_key = "合价" if key == "综合单价" else key - # 递归处理值 - new_obj[new_key] = replace_key(value) - return new_obj - elif isinstance(obj, list): - return [replace_key(item) for item in obj] - else: - return obj - - # 替换所有出现的"综合单价" - new_data = replace_key(data) - - # 写回文件 - with open(output_file, "w", encoding="utf-8") as f: - json.dump(new_data, f, ensure_ascii=False, indent=2) - - print(f"已将取费表中的'综合单价'替换为'合价'") - except Exception as e: - print(f"修改取费表时出错: {e}") - - -class DistributionBudgetCalculationStrategy(DefaultCalculationStrategy): - """配网预算计算策略""" - - # def calculate_fee_base( - # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None - # ) -> float: - # """计算取费基数,可以重写以添加配网预算特定的计算规则""" - # # 示例:如果取费基数是特定值,使用特定的计算规则 - # if fee_base == "材料费": - # print(f"应用配网预算特定的取费基数计算规则: {fee_base}") - # # 这里可以添加特定的计算逻辑 - - # # 暂时仍然使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - # # 对于其他取费基数,使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - -class DistributionBillCalculationStrategy(DefaultCalculationStrategy): - """配网清单计算策略""" - - # def calculate_fee_base( - # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None - # ) -> float: - # """计算取费基数,可以重写以添加配网清单特定的计算规则""" - # # 示例:如果取费基数是特定值,使用特定的计算规则 - # if fee_base == "机械费": - # print(f"应用配网清单特定的取费基数计算规则: {fee_base}") - # # 这里可以添加特定的计算逻辑 - - # # 暂时仍然使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - # # 对于其他取费基数,使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - -class TechnicalRenovationBudgetCalculationStrategy(DefaultCalculationStrategy): - """技改预算计算策略""" - - # def calculate_fee_base( - # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None - # ) -> float: - # """计算取费基数,可以重写以添加技改预算特定的计算规则""" - # # 示例:如果取费基数是特定值,使用特定的计算规则 - # if fee_base == "人工费+材料费": - # print(f"应用技改预算特定的取费基数计算规则: {fee_base}") - # # 这里可以添加特定的计算逻辑 - - # # 暂时仍然使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - # # 对于其他取费基数,使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - -class TechnicalRenovationBillCalculationStrategy(DefaultCalculationStrategy): - """技改清单计算策略""" - - # def calculate_fee_base( - # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None - # ) -> float: - # """计算取费基数,可以重写以添加技改清单特定的计算规则""" - # # 示例:如果取费基数是特定值,使用特定的计算规则 - # if fee_base == "人工费+材料费+机械费": - # print(f"应用技改清单特定的取费基数计算规则: {fee_base}") - # # 这里可以添加特定的计算逻辑 - - # # 暂时仍然使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - - # # 对于其他取费基数,使用默认实现 - # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) - def calculate_external_variable(self, var_name: str, context: Any) -> float: - """计算表外变量,并乘以父级清单的数量""" - # 使用默认实现计算表外变量 - from equipment_calculation.bcl_utils import calculator - - result = calculator.calculate(var_name, context) - - # 获取清单数量 - 首先从全局变量获取 - global _BILL_QUANTITY # 在模块级别定义一个全局变量 - parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 - - print(f"\n===== 获取清单数量 =====") - print(f"从全局变量获取清单数量: {parent_quantity}") - - # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 - if parent_quantity == 1.0 and hasattr(context, "__dict__"): - module = sys.modules.get("quantity_fee_calculator") - if module and hasattr(module, "bill_quantity_cache"): - bill_cache = getattr(module, "bill_quantity_cache") - bill_id = None - - # 尝试从上下文中获取bill_id - if hasattr(context, "bill_id"): - bill_id = context.bill_id - elif hasattr(context, "datasource") and context.datasource: - for item in context.datasource: - if hasattr(item, "object") and hasattr(item.object, "bill_id"): - bill_id = item.object.bill_id - break - - # 如果找到bill_id并且在缓存中,使用缓存的数量 - if bill_id and bill_id in bill_cache: - parent_quantity = bill_cache[bill_id] - print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") - - # 设置一个临时环境变量作为最后的手段 - if parent_quantity == 1.0: - import os - - env_quantity = os.environ.get("BILL_QUANTITY") - if env_quantity: - try: - parent_quantity = float(env_quantity) - print(f"从环境变量获取清单数量: {parent_quantity}") - except: - pass - - print(f"===== 获取清单数量结束 =====\n") - - # 将结果乘以父级清单的数量 - final_result = float(result) * parent_quantity if result is not None else 0.0 - print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") - return final_result - - def preprocess_quantity_fee_node(self, node: dict) -> None: - """ - 工程量取费表节点预处理规则: - - 类型为"1"或"主材"时,供货方映射,主材属性补全 - - 类型为"5"或"设备"时,供货方映射,设备类型映射 - """ - node_type = str(node.get("类型", "")) - # 主材处理 - if node_type == "1" or node_type == "主材": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 补全主材属性 - for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: - if key not in node: - node[key] = 0 - # 设备处理 - if node_type == "5" or node_type == "设备": - supplier = str(node.get("供货方", "")) - if supplier == "1": - node["供货方"] = "甲供" - elif supplier == "2": - node["供货方"] = "乙供" - # 设备类型映射 - if str(node.get("设备类型", "")) == "0": - node["设备类型"] = "普通设备" - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """将取费表中的'综合单价'替换为'合价'""" - try: - # 读取输出文件 - with open(output_file, "r", encoding="utf-8") as f: - data = json.load(f) - - # 递归查找并替换"综合单价"为"合价" - def replace_key(obj): - if isinstance(obj, dict): - new_obj = {} - for key, value in obj.items(): - # 替换键名 - new_key = "合价" if key == "综合单价" else key - # 递归处理值 - new_obj[new_key] = replace_key(value) - return new_obj - elif isinstance(obj, list): - return [replace_key(item) for item in obj] - else: - return obj - - # 替换所有出现的"综合单价" - new_data = replace_key(data) - - # 写回文件 - with open(output_file, "w", encoding="utf-8") as f: - json.dump(new_data, f, ensure_ascii=False, indent=2) - - print(f"已将取费表中的'综合单价'替换为'合价'") - except Exception as e: - print(f"修改取费表时出错: {e}") - - -class MainGridBudgetCalculator(CalculatorBase): - """主网预算计算器""" - - def __init__(self): - super().__init__(MAIN_GRID_BUDGET) - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建主网预算计算策略""" - return MainGridBudgetCalculationStrategy() - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def apply_quantity_fee_rules(self) -> None: - """应用主网预算特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") - - # 创建预处理函数 - def preprocess_nodes_recursive(json_file_path): - try: - # 读取 JSON 文件 - with open(json_file_path, "r", encoding="utf-8") as f: - data = json.load(f) - - # 获取工程量数据 - project_data = data.get("projectData", {}) - quantity_data = project_data.get("quantity", {}) - - # 递归处理所有节点 - def process_node(node): - # 调用计算策略的预处理方法 - if isinstance(node, dict): - self.calculation_strategy.preprocess_quantity_fee_node(node) - # 处理子节点 - if "children" in node and isinstance(node["children"], list): - for child in node["children"]: - process_node(child) - - # 处理所有工程量节点 - process_node(quantity_data) - - # 写回 JSON 文件 - with open(json_file_path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - print(f"已完成对 {json_file_path} 中所有节点的预处理") - except Exception as e: - print(f"预处理节点时出错: {e}") - - # 不在这里调用预处理函数,因为我们还不知道 JSON 文件路径 - # 这个函数会在 calculate_quantity_fee_tables 中调用 - # preprocess_nodes_recursive(json_file_path) - - # 只需实现一个空函数,返回预处理函数以便后续调用 - return preprocess_nodes_recursive - - def apply_resource_fee_rules(self) -> None: - """应用主网预算特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用主网预算特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用主网预算特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -class MainGridBillCalculator(CalculatorBase): - """主网清单计算器""" - - def __init__(self): - super().__init__(MAIN_GRID_BILL) - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建主网清单计算策略""" - return MainGridBillCalculationStrategy() - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def apply_quantity_fee_rules(self) -> None: - """应用主网清单特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") - - # 创建预处理函数,这个函数会处理JSON文件中的节点 - def preprocess_nodes_recursive(json_file_path): - try: - # 读取 JSON 文件 - with open(json_file_path, "r", encoding="utf-8") as f: - data = json.load(f) - - # 获取工程量数据 - project_data = data.get("projectData", {}) - quantity_data = project_data.get("quantity", {}) - bill_data = project_data.get("bill", {}) - - print(f"开始预处理工程量和清单节点...") - - # 递归处理所有节点 - def process_node(node): - if isinstance(node, dict): - self.calculation_strategy.preprocess_quantity_fee_node(node) - if "children" in node and isinstance(node["children"], list): - for child in node["children"]: - process_node(child) - - # 处理工程量节点 - process_node(quantity_data) - # 也处理清单节点,因为对于清单工程,清单节点信息也很重要 - process_node(bill_data) - - # 写回文件 - with open(json_file_path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - print(f"已完成对 {json_file_path} 中所有节点的预处理") - except Exception as e: - print(f"预处理节点时出错: {e}") - import traceback - - traceback.print_exc() - - # 返回预处理函数,供 calculate_quantity_fee_tables 调用 - return preprocess_nodes_recursive - - def apply_resource_fee_rules(self) -> None: - """应用主网清单特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用主网清单特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用主网清单特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -class DistributionBudgetCalculator(CalculatorBase): - """配网预算计算器""" - - def __init__(self): - super().__init__(DISTRIBUTION_BUDGET) - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建配网预算计算策略""" - return DistributionBudgetCalculationStrategy() - - def apply_quantity_fee_rules(self) -> None: - """应用配网预算特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则") - - def apply_resource_fee_rules(self) -> None: - """应用配网预算特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用配网预算特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用配网预算特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -class DistributionBillCalculator(CalculatorBase): - """配网清单计算器""" - - def __init__(self): - super().__init__(DISTRIBUTION_BILL) - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建配网清单计算策略""" - return DistributionBillCalculationStrategy() - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def apply_quantity_fee_rules(self) -> None: - """应用配网清单特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则") - - def apply_resource_fee_rules(self) -> None: - """应用配网清单特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用配网清单特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用配网清单特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -class TechnicalRenovationBudgetCalculator(CalculatorBase): - """技改预算计算器""" - - def __init__(self): - super().__init__(TECHNICAL_RENOVATION_BUDGET) - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建技改预算计算策略""" - return TechnicalRenovationBudgetCalculationStrategy() - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def apply_quantity_fee_rules(self) -> None: - """应用技改预算特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则") - - def apply_resource_fee_rules(self) -> None: - """应用技改预算特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用技改预算特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用技改预算特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -class TechnicalRenovationBillCalculator(CalculatorBase): - """技改清单计算器""" - - def __init__(self): - super().__init__(TECHNICAL_RENOVATION_BILL) - - def create_calculation_strategy(self) -> CalculationStrategy: - """创建技改清单计算策略""" - return TechnicalRenovationBillCalculationStrategy() - - def set_output_dir(self, output_dir): - """ - 设置计算结果的输出目录 - - :param output_dir: 输出目录的路径 - """ - self.output_dir = output_dir - - def apply_quantity_fee_rules(self) -> None: - """应用技改清单特定的工程量取费规则""" - print(f"应用{self.software_type.name}特定的工程量取费规则") - - def apply_resource_fee_rules(self) -> None: - """应用技改清单特定的人材机合价规则""" - print(f"应用{self.software_type.name}特定的人材机合价规则") - - def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: - """应用技改清单特定的工程量取费后处理规则""" - print(f"应用{self.software_type.name}特定的工程量取费后处理规则") - - def post_process_resource_fees(self, output_file: str, project_name: str) -> None: - """应用技改清单特定的人材机合价后处理规则""" - print(f"应用{self.software_type.name}特定的人材机合价后处理规则") - - -def get_calculator(software_type: SoftwareType) -> CalculatorBase: - """ - 根据软件类型获取对应的计算器 - - Args: - software_type: 软件类型 - - Returns: - CalculatorBase: 计算器实例 - """ - calculators = { - MAIN_GRID_BUDGET.name: MainGridBudgetCalculator(), - MAIN_GRID_BILL.name: MainGridBillCalculator(), - DISTRIBUTION_BUDGET.name: DistributionBudgetCalculator(), - DISTRIBUTION_BILL.name: DistributionBillCalculator(), - TECHNICAL_RENOVATION_BUDGET.name: TechnicalRenovationBudgetCalculator(), - TECHNICAL_RENOVATION_BILL.name: TechnicalRenovationBillCalculator(), - } - - return calculators.get(software_type.name) From c50ebec75f2da9c6d6ee74103af0ce2cf085b9ff Mon Sep 17 00:00:00 2001 From: chentianrui Date: Sat, 2 Aug 2025 14:55:07 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20equipment=5Fcalculation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- equipment_calculation/main.py | 315 ++++++++ equipment_calculation/software_calculators.py | 703 ++++++++++++++++++ 2 files changed, 1018 insertions(+) create mode 100644 equipment_calculation/main.py create mode 100644 equipment_calculation/software_calculators.py diff --git a/equipment_calculation/main.py b/equipment_calculation/main.py new file mode 100644 index 0000000..831f60f --- /dev/null +++ b/equipment_calculation/main.py @@ -0,0 +1,315 @@ +""" +第二步:计算bcl结果 +""" + +import os +import argparse +import re +import json +from typing import Dict, List, Any, Optional, Tuple +from equipment_calculation.software_types import ( + get_software_type, + SoftwareCategory, + EngineeringType, + ALL_SOFTWARE_TYPES, +) +from equipment_calculation.software_calculators import get_calculator + + +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="工程量取费和人材机合价计算程序") + + # 软件类型参数 + parser.add_argument( + "--category", choices=["主网", "配网", "技改"], default="主网", help="软件类别(主网/配网/技改)" + ) + + parser.add_argument("--engineering-type", choices=["预算", "清单"], default="清单", help="工程类型(预算/清单)") + + # 计算类型参数 + parser.add_argument( + "--calculate", + choices=["all", "quantity", "resource"], + default="quantity", + help="计算类型(all: 全部, quantity: 工程量取费, resource: 人材机合价)", + ) + + # 项目参数 + parser.add_argument("--project", help="项目名称,如果不指定则处理所有项目") + + parser.add_argument("--adjustment-type", default="调差", help="调差类型,默认为'调差'") + + # 输入文件参数 + parser.add_argument( + "--input-file", + default="测试案例/主网清单变电.json", + help="输入JSON文件路径,如果不指定则使用默认路径", + ) + + # 添加输入和输出文件夹参数 + parser.add_argument("--input-folder", help="输入文件夹路径,包含要处理的JSON文件") + parser.add_argument("--output-folder", default="计算结果", help="输出文件夹路径,用于保存计算结果") + + return parser.parse_args() + + +def parse_filename(filename: str) -> Tuple[str, str]: + """ + 从文件名中解析软件类别和工程类型(备用方法) + + :param filename: JSON文件名,例如"主网预算变电.json" + :return: (category, engineering_type) 元组,例如 ("主网", "预算") + """ + # 移除扩展名 + basename = os.path.splitext(filename)[0] + + # 查找类别(主网/配网/技改) + category = None + for cat in ["主网", "配网", "技改"]: + if cat in basename: + category = cat + break + + # 查找工程类型(预算/清单) + engineering_type = None + for eng_type in ["预算", "清单"]: + if eng_type in basename: + engineering_type = eng_type + break + + # 如果未找到,使用默认值 + if not category: + print(f"警告: 无法从文件名 '{filename}' 中解析软件类别,使用默认值 '主网'") + category = "主网" + + if not engineering_type: + print(f"警告: 无法从文件名 '{filename}' 中解析工程类型,使用默认值 '清单'") + engineering_type = "清单" + + return category, engineering_type + + +def parse_json_content(json_file_path: str) -> Tuple[Optional[str], Optional[str]]: + """ + 从JSON文件内容中解析软件类别和工程类型 + + :param json_file_path: JSON文件路径 + :return: (category, engineering_type) 元组,如果解析失败则返回 (None, None) + """ + try: + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 定义阶段类型映射表 + budget_types = ["概预算", "定额", "估算", "概算"] + list_types = ["清单", "结算", "招标控制价", "招投标工程"] + + # 从division字段获取软件名称和阶段类型 + if "division" in data: + division = data["division"] + print(f"找到division字段: {division}") + + # 使用-分割division字段 + parts = division.split("-") + if len(parts) >= 2: + category = parts[0].strip() + stage_type = parts[1].strip() + + # 验证软件类别 + if category not in ["主网", "配网", "技改"]: + print(f"警告: division中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") + category = "主网" + + # 映射阶段类型 + if any(budget_type in stage_type for budget_type in budget_types): + engineering_type = "预算" + elif any(list_type in stage_type for list_type in list_types): + engineering_type = "清单" + else: + print(f"警告: division中的阶段类型 '{stage_type}' 无法映射到预算或清单,将使用默认值 '清单'") + engineering_type = "清单" + + print(f"从division解析: 软件名称={category}, 阶段类型={engineering_type} (原始值: {stage_type})") + return category, engineering_type + else: + print(f"警告: division字段 '{division}' 格式不正确,无法分割") + else: + print(f"警告: JSON文件中未找到division字段,尝试从basicData中解析") + + # 作为备选,尝试从basicData中获取 + if "basicData" in data: + basic_data = data["basicData"] + category = basic_data.get("软件名称") + engineering_type = basic_data.get("阶段类型") + + # 验证解析结果 + if category and engineering_type: + # 确保category是有效值 + if category not in ["主网", "配网", "技改"]: + print(f"警告: basicData中的软件名称 '{category}' 不是有效值,将使用默认值 '主网'") + category = "主网" + + # 确保engineering_type是有效值 + if engineering_type not in ["预算", "清单"]: + print(f"警告: basicData中的阶段类型 '{engineering_type}' 不是有效值,将使用默认值 '清单'") + engineering_type = "清单" + + print(f"从basicData解析: 软件名称={category}, 阶段类型={engineering_type}") + return category, engineering_type + else: + print(f"警告: basicData中未找到软件名称或阶段类型") + else: + print(f"警告: JSON文件中未找到basicData部分") + + return None, None + + except Exception as e: + print(f"解析JSON文件内容时出错: {str(e)}") + return None, None + + +def process_json_file( + input_file: str, + base_output_dir: str = "计算结果", + category: Optional[str] = None, + engineering_type: Optional[str] = None, + project: Optional[str] = None, + adjustment_type: str = "调差", +) -> bool: + """ + 处理单个JSON文件 + + :param input_file: 输入JSON文件路径 + :param base_output_dir: 基础输出目录路径,实际结果会保存在此目录下的子文件夹中 + :param category: 软件类别,如果为None则从JSON内容中解析 + :param engineering_type: 工程类型,如果为None则从JSON内容中解析 + :param project: 项目名称,如果为None则处理所有项目 + :param adjustment_type: 调差类型 + :return: 处理是否成功 + """ + try: + # 获取文件名(不含扩展名)作为输出子文件夹的名称 + filename = os.path.basename(input_file) + file_basename = os.path.splitext(filename)[0] + output_dir = os.path.join(base_output_dir, file_basename) + + # 创建输出子文件夹 + os.makedirs(output_dir, exist_ok=True) + print(f"创建输出目录: {output_dir}") + + # 如果未指定category或engineering_type,从JSON内容中解析 + if category is None or engineering_type is None: + # 首先尝试从JSON内容中解析 + json_category, json_engineering_type = parse_json_content(input_file) + + if category is None: + if json_category: + category = json_category + else: + # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) + print("从JSON内容解析软件名称失败,尝试从文件名解析...") + parsed_category, _ = parse_filename(filename) + category = parsed_category + + if engineering_type is None: + if json_engineering_type: + engineering_type = json_engineering_type + else: + # 如果从JSON内容中解析失败,尝试从文件名中解析(作为备用) + print("从JSON内容解析阶段类型失败,尝试从文件名解析...") + _, parsed_engineering_type = parse_filename(filename) + engineering_type = parsed_engineering_type + + print(f"处理文件: {input_file}") + print(f" 软件类别: {category}") + print(f" 工程类型: {engineering_type}") + + # 获取软件类型 + software_type = get_software_type(category, engineering_type) + print(f" 使用软件类型: {software_type.name}") + + # 获取计算器 + calculator = get_calculator(software_type) + if not calculator: + print(f" 错误: 未找到软件类型 {software_type.name} 的计算器") + return False + + # 设置输出目录 + calculator.set_output_dir(output_dir) + + # 执行计算 + print(" 开始计算工程量取费表...") + calculator.calculate_quantity_fee_tables(json_file_path=input_file, project_name=project) + + print(" 开始计算人材机合价...") + calculator.calculate_resource_fee_tables(json_file_path=input_file, project_name=project) + + print(f" 处理完成: {input_file}") + print(f" 结果保存在: {output_dir}") + return True + + except Exception as e: + print(f" 处理文件 {input_file} 时出错: {str(e)}") + return False + + +def process_BCL_calculate(input_folder: str, output_folder: str) -> List[Tuple[str, bool]]: + """ + 处理指定文件夹中的所有JSON文件 + + :param input_folder: 输入文件夹路径,包含要处理的JSON文件 + :param output_folder: 输出文件夹路径,用于保存计算结果 + :return: 处理结果列表,格式为 [(文件路径, 是否成功), ...] + """ + # 确保基础输出目录存在 + os.makedirs(output_folder, exist_ok=True) + + # 查找所有JSON文件 + json_files = [] + for file in os.listdir(input_folder): + if file.lower().endswith(".json"): + json_files.append(os.path.join(input_folder, file)) + + if not json_files: + print(f"警告: 在目录 {input_folder} 中没有找到JSON文件") + return [] + + # 处理每个JSON文件 + results = [] + for input_file in json_files: + success = process_json_file( + input_file=input_file, + base_output_dir=output_folder, # 传递基础输出目录 + category=None, # 从JSON内容中解析 + engineering_type=None, # 从JSON内容中解析 + project=None, # 处理所有项目 + adjustment_type="调差", + ) + + results.append((input_file, success)) + + return results + + +def main(): + """程序入口""" + + input_folder = "project2json/outputs/json" + output_folder = "project2json/outputs/bcl_results" + + results = process_BCL_calculate(input_folder, output_folder) + + # 显示处理结果 + success_count = sum(1 for _, success in results if success) + print(f"\n处理完成: 成功 {success_count}/{len(results)} 个文件") + + if success_count < len(results): + print("处理失败的文件:") + for file_path, success in results: + if not success: + print(f" - {os.path.basename(file_path)}") + + +if __name__ == "__main__": + main() diff --git a/equipment_calculation/software_calculators.py b/equipment_calculation/software_calculators.py new file mode 100644 index 0000000..d2fef44 --- /dev/null +++ b/equipment_calculation/software_calculators.py @@ -0,0 +1,703 @@ +from equipment_calculation.calculator_base import CalculatorBase +from equipment_calculation.software_types import ( + SoftwareType, + MAIN_GRID_BUDGET, + MAIN_GRID_BILL, + DISTRIBUTION_BUDGET, + DISTRIBUTION_BILL, + TECHNICAL_RENOVATION_BUDGET, + TECHNICAL_RENOVATION_BILL, +) +from equipment_calculation.calculation_strategy import CalculationStrategy, DefaultCalculationStrategy +import json +import os +from typing import Any, Dict, Optional, Set +import sys +import re + +# 添加一个全局变量和一个缓存字典来存储清单数量 +_BILL_QUANTITY = 1.0 +bill_quantity_cache = {} + + +# 为每种软件类型定义特定的计算策略 +class MainGridBudgetCalculationStrategy(DefaultCalculationStrategy): + """主网预算计算策略""" + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 定额处理 + if node_type == "0" or node_type == "定额": + supplier = str(node.get("费用类型", "")) + if supplier == "1": + node["费用类型"] = "不取费" + elif supplier == "0": + node["费用类型"] = "取费" + + # 处理特征段 + seg_val = node.get("特征段", "") + if isinstance(seg_val, str): + num = re.search(r"\d+", seg_val) + node["特征段"] = num.group() if num else "" + + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + +class MainGridBillCalculationStrategy(DefaultCalculationStrategy): + """主网清单计算策略""" + + def __init__(self): + super().__init__() + self.bill_quantity = 1.0 # 默认值 + + def set_bill_quantity(self, quantity): + """设置清单数量""" + try: + self.bill_quantity = float(quantity) + print(f"设置计算策略的清单数量: {self.bill_quantity}") + except (TypeError, ValueError): + print(f"无法将 {quantity} 转换为浮点数") + + def calculate_external_variable(self, var_name: str, context: Any) -> float: + """计算表外变量,并乘以父级清单的数量""" + # 使用默认实现计算表外变量 + from equipment_calculation.bcl_utils import calculator + + result = calculator.calculate(var_name, context) + + # 获取清单数量 - 首先从全局变量获取 + global _BILL_QUANTITY # 在模块级别定义一个全局变量 + parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 + + print(f"\n===== 获取清单数量 =====") + print(f"从全局变量获取清单数量: {parent_quantity}") + + # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 + if parent_quantity == 1.0 and hasattr(context, "__dict__"): + module = sys.modules.get("quantity_fee_calculator") + if module and hasattr(module, "bill_quantity_cache"): + bill_cache = getattr(module, "bill_quantity_cache") + bill_id = None + + # 尝试从上下文中获取bill_id + if hasattr(context, "bill_id"): + bill_id = context.bill_id + elif hasattr(context, "datasource") and context.datasource: + for item in context.datasource: + if hasattr(item, "object") and hasattr(item.object, "bill_id"): + bill_id = item.object.bill_id + break + + # 如果找到bill_id并且在缓存中,使用缓存的数量 + if bill_id and bill_id in bill_cache: + parent_quantity = bill_cache[bill_id] + print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") + + # 设置一个临时环境变量作为最后的手段 + if parent_quantity == 1.0: + import os + + env_quantity = os.environ.get("BILL_QUANTITY") + if env_quantity: + try: + parent_quantity = float(env_quantity) + print(f"从环境变量获取清单数量: {parent_quantity}") + except: + pass + + print(f"===== 获取清单数量结束 =====\n") + + # 将结果乘以父级清单的数量 + final_result = float(result) * parent_quantity if result is not None else 0.0 + print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") + return final_result + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """将取费表中的'综合单价'替换为'合价'""" + try: + # 读取输出文件 + with open(output_file, "r", encoding="utf-8") as f: + data = json.load(f) + + # 递归查找并替换"综合单价"为"合价" + def replace_key(obj): + if isinstance(obj, dict): + new_obj = {} + for key, value in obj.items(): + # 替换键名 + new_key = "合价" if key == "综合单价" else key + # 递归处理值 + new_obj[new_key] = replace_key(value) + return new_obj + elif isinstance(obj, list): + return [replace_key(item) for item in obj] + else: + return obj + + # 替换所有出现的"综合单价" + new_data = replace_key(data) + + # 写回文件 + with open(output_file, "w", encoding="utf-8") as f: + json.dump(new_data, f, ensure_ascii=False, indent=2) + + print(f"已将取费表中的'综合单价'替换为'合价'") + except Exception as e: + print(f"修改取费表时出错: {e}") + + +class DistributionBudgetCalculationStrategy(DefaultCalculationStrategy): + """配网预算计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加配网预算特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "材料费": + # print(f"应用配网预算特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class DistributionBillCalculationStrategy(DefaultCalculationStrategy): + """配网清单计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加配网清单特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "机械费": + # print(f"应用配网清单特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class TechnicalRenovationBudgetCalculationStrategy(DefaultCalculationStrategy): + """技改预算计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加技改预算特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "人工费+材料费": + # print(f"应用技改预算特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + +class TechnicalRenovationBillCalculationStrategy(DefaultCalculationStrategy): + """技改清单计算策略""" + + # def calculate_fee_base( + # self, fee_base: str, cost_table: dict, project_node: dict, context, in_calculation=None + # ) -> float: + # """计算取费基数,可以重写以添加技改清单特定的计算规则""" + # # 示例:如果取费基数是特定值,使用特定的计算规则 + # if fee_base == "人工费+材料费+机械费": + # print(f"应用技改清单特定的取费基数计算规则: {fee_base}") + # # 这里可以添加特定的计算逻辑 + + # # 暂时仍然使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + + # # 对于其他取费基数,使用默认实现 + # return super().calculate_fee_base(fee_base, cost_table, project_node, context, in_calculation) + def calculate_external_variable(self, var_name: str, context: Any) -> float: + """计算表外变量,并乘以父级清单的数量""" + # 使用默认实现计算表外变量 + from equipment_calculation.bcl_utils import calculator + + result = calculator.calculate(var_name, context) + + # 获取清单数量 - 首先从全局变量获取 + global _BILL_QUANTITY # 在模块级别定义一个全局变量 + parent_quantity = _BILL_QUANTITY if "_BILL_QUANTITY" in globals() else 1.0 + + print(f"\n===== 获取清单数量 =====") + print(f"从全局变量获取清单数量: {parent_quantity}") + + # 如果全局变量没有设置正确的值,尝试从bill_quantity_cache获取 + if parent_quantity == 1.0 and hasattr(context, "__dict__"): + module = sys.modules.get("quantity_fee_calculator") + if module and hasattr(module, "bill_quantity_cache"): + bill_cache = getattr(module, "bill_quantity_cache") + bill_id = None + + # 尝试从上下文中获取bill_id + if hasattr(context, "bill_id"): + bill_id = context.bill_id + elif hasattr(context, "datasource") and context.datasource: + for item in context.datasource: + if hasattr(item, "object") and hasattr(item.object, "bill_id"): + bill_id = item.object.bill_id + break + + # 如果找到bill_id并且在缓存中,使用缓存的数量 + if bill_id and bill_id in bill_cache: + parent_quantity = bill_cache[bill_id] + print(f"从bill_quantity_cache获取清单{bill_id}的数量: {parent_quantity}") + + # 设置一个临时环境变量作为最后的手段 + if parent_quantity == 1.0: + import os + + env_quantity = os.environ.get("BILL_QUANTITY") + if env_quantity: + try: + parent_quantity = float(env_quantity) + print(f"从环境变量获取清单数量: {parent_quantity}") + except: + pass + + print(f"===== 获取清单数量结束 =====\n") + + # 将结果乘以父级清单的数量 + final_result = float(result) * parent_quantity if result is not None else 0.0 + print(f"表外变量 '{var_name}' 的值为 {result},父级清单数量为 {parent_quantity},最终结果为 {final_result}") + return final_result + + def preprocess_quantity_fee_node(self, node: dict) -> None: + """ + 工程量取费表节点预处理规则: + - 类型为"1"或"主材"时,供货方映射,主材属性补全 + - 类型为"5"或"设备"时,供货方映射,设备类型映射 + """ + node_type = str(node.get("类型", "")) + # 主材处理 + if node_type == "1" or node_type == "主材": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 补全主材属性 + for key in ["拆分", "设备性材料", "损耗", "预算价含税"]: + if key not in node: + node[key] = 0 + # 设备处理 + if node_type == "5" or node_type == "设备": + supplier = str(node.get("供货方", "")) + if supplier == "1": + node["供货方"] = "甲供" + elif supplier == "2": + node["供货方"] = "乙供" + # 设备类型映射 + if str(node.get("设备类型", "")) == "0": + node["设备类型"] = "普通设备" + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """将取费表中的'综合单价'替换为'合价'""" + try: + # 读取输出文件 + with open(output_file, "r", encoding="utf-8") as f: + data = json.load(f) + + # 递归查找并替换"综合单价"为"合价" + def replace_key(obj): + if isinstance(obj, dict): + new_obj = {} + for key, value in obj.items(): + # 替换键名 + new_key = "合价" if key == "综合单价" else key + # 递归处理值 + new_obj[new_key] = replace_key(value) + return new_obj + elif isinstance(obj, list): + return [replace_key(item) for item in obj] + else: + return obj + + # 替换所有出现的"综合单价" + new_data = replace_key(data) + + # 写回文件 + with open(output_file, "w", encoding="utf-8") as f: + json.dump(new_data, f, ensure_ascii=False, indent=2) + + print(f"已将取费表中的'综合单价'替换为'合价'") + except Exception as e: + print(f"修改取费表时出错: {e}") + + +class MainGridBudgetCalculator(CalculatorBase): + """主网预算计算器""" + + def __init__(self): + super().__init__(MAIN_GRID_BUDGET) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建主网预算计算策略""" + return MainGridBudgetCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用主网预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") + + # 创建预处理函数 + def preprocess_nodes_recursive(json_file_path): + try: + # 读取 JSON 文件 + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取工程量数据 + project_data = data.get("projectData", {}) + quantity_data = project_data.get("quantity", {}) + + # 递归处理所有节点 + def process_node(node): + # 调用计算策略的预处理方法 + if isinstance(node, dict): + self.calculation_strategy.preprocess_quantity_fee_node(node) + # 处理子节点 + if "children" in node and isinstance(node["children"], list): + for child in node["children"]: + process_node(child) + + # 处理所有工程量节点 + process_node(quantity_data) + + # 写回 JSON 文件 + with open(json_file_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print(f"已完成对 {json_file_path} 中所有节点的预处理") + except Exception as e: + print(f"预处理节点时出错: {e}") + + # 不在这里调用预处理函数,因为我们还不知道 JSON 文件路径 + # 这个函数会在 calculate_quantity_fee_tables 中调用 + # preprocess_nodes_recursive(json_file_path) + + # 只需实现一个空函数,返回预处理函数以便后续调用 + return preprocess_nodes_recursive + + def apply_resource_fee_rules(self) -> None: + """应用主网预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用主网预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用主网预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class MainGridBillCalculator(CalculatorBase): + """主网清单计算器""" + + def __init__(self): + super().__init__(MAIN_GRID_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建主网清单计算策略""" + return MainGridBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用主网清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则 - 对节点进行预处理") + + # 创建预处理函数,这个函数会处理JSON文件中的节点 + def preprocess_nodes_recursive(json_file_path): + try: + # 读取 JSON 文件 + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 获取工程量数据 + project_data = data.get("projectData", {}) + quantity_data = project_data.get("quantity", {}) + bill_data = project_data.get("bill", {}) + + print(f"开始预处理工程量和清单节点...") + + # 递归处理所有节点 + def process_node(node): + if isinstance(node, dict): + self.calculation_strategy.preprocess_quantity_fee_node(node) + if "children" in node and isinstance(node["children"], list): + for child in node["children"]: + process_node(child) + + # 处理工程量节点 + process_node(quantity_data) + # 也处理清单节点,因为对于清单工程,清单节点信息也很重要 + process_node(bill_data) + + # 写回文件 + with open(json_file_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + print(f"已完成对 {json_file_path} 中所有节点的预处理") + except Exception as e: + print(f"预处理节点时出错: {e}") + import traceback + + traceback.print_exc() + + # 返回预处理函数,供 calculate_quantity_fee_tables 调用 + return preprocess_nodes_recursive + + def apply_resource_fee_rules(self) -> None: + """应用主网清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用主网清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用主网清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class DistributionBudgetCalculator(CalculatorBase): + """配网预算计算器""" + + def __init__(self): + super().__init__(DISTRIBUTION_BUDGET) + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建配网预算计算策略""" + return DistributionBudgetCalculationStrategy() + + def apply_quantity_fee_rules(self) -> None: + """应用配网预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用配网预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用配网预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用配网预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class DistributionBillCalculator(CalculatorBase): + """配网清单计算器""" + + def __init__(self): + super().__init__(DISTRIBUTION_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建配网清单计算策略""" + return DistributionBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用配网清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用配网清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用配网清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用配网清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class TechnicalRenovationBudgetCalculator(CalculatorBase): + """技改预算计算器""" + + def __init__(self): + super().__init__(TECHNICAL_RENOVATION_BUDGET) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建技改预算计算策略""" + return TechnicalRenovationBudgetCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用技改预算特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用技改预算特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用技改预算特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用技改预算特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +class TechnicalRenovationBillCalculator(CalculatorBase): + """技改清单计算器""" + + def __init__(self): + super().__init__(TECHNICAL_RENOVATION_BILL) + + def create_calculation_strategy(self) -> CalculationStrategy: + """创建技改清单计算策略""" + return TechnicalRenovationBillCalculationStrategy() + + def set_output_dir(self, output_dir): + """ + 设置计算结果的输出目录 + + :param output_dir: 输出目录的路径 + """ + self.output_dir = output_dir + + def apply_quantity_fee_rules(self) -> None: + """应用技改清单特定的工程量取费规则""" + print(f"应用{self.software_type.name}特定的工程量取费规则") + + def apply_resource_fee_rules(self) -> None: + """应用技改清单特定的人材机合价规则""" + print(f"应用{self.software_type.name}特定的人材机合价规则") + + def post_process_quantity_fees(self, output_file: str, project_name: str) -> None: + """应用技改清单特定的工程量取费后处理规则""" + print(f"应用{self.software_type.name}特定的工程量取费后处理规则") + + def post_process_resource_fees(self, output_file: str, project_name: str) -> None: + """应用技改清单特定的人材机合价后处理规则""" + print(f"应用{self.software_type.name}特定的人材机合价后处理规则") + + +def get_calculator(software_type: SoftwareType) -> CalculatorBase: + """ + 根据软件类型获取对应的计算器 + + Args: + software_type: 软件类型 + + Returns: + CalculatorBase: 计算器实例 + """ + calculators = { + MAIN_GRID_BUDGET.name: MainGridBudgetCalculator(), + MAIN_GRID_BILL.name: MainGridBillCalculator(), + DISTRIBUTION_BUDGET.name: DistributionBudgetCalculator(), + DISTRIBUTION_BILL.name: DistributionBillCalculator(), + TECHNICAL_RENOVATION_BUDGET.name: TechnicalRenovationBudgetCalculator(), + TECHNICAL_RENOVATION_BILL.name: TechnicalRenovationBillCalculator(), + } + + return calculators.get(software_type.name)