def classify_files(folder_path):
import os
import shutil
from pathlib import Path
"""
根据文件名规则将文件复制分类到4个不同的文件夹中
参数:
folder_path: 输入文件夹路径
输出:
在输入文件夹的同级目录中创建4个子文件夹并分类复制文件:
- xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
- xml_data1.5: 项目划分_取费_*
- xml_data2: 安装_工程量*, 工程量精度, 工程属性
- xml_data3: 剩余文件
"""
# 将输入路径转换为Path对象
source_dir = Path(folder_path)
# 检查输入文件夹是否存在
if not source_dir.exists():
print(f"错误:文件夹 '{folder_path}' 不存在")
return False
# 获取输入文件夹的父目录
parent_dir = source_dir.parent
# 定义目标文件夹(放在同级目录)
folders = {
'xml_data': parent_dir / 'xml_data',
'xml_data1.5': parent_dir / 'xml_data1.5',
'xml_data2': parent_dir / 'xml_data2',
'xml_data3': parent_dir / 'xml_data3'
}
# 在分类前清空或创建目标文件夹
print("开始清空输出文件夹...")
for folder_name, folder_path in folders.items():
if folder_path.exists():
# 清空文件夹内容
print(f"清空文件夹: {folder_path}")
for item in folder_path.iterdir():
if item.is_file():
item.unlink() # 删除文件
elif item.is_dir():
shutil.rmtree(item) # 删除子文件夹
else:
# 创建文件夹
folder_path.mkdir(parents=True, exist_ok=True)
print(f"创建文件夹: {folder_path}")
print("输出文件夹清空完成,开始复制分类文件...")
# 遍历源文件夹中的所有文件
copied_files = 0
for file_path in source_dir.iterdir():
# 只处理文件,跳过文件夹
if not file_path.is_file():
continue
filename = file_path.name
# 分类逻辑
if filename.startswith('项目划分_取费_'):
# xml_data1.5: 项目划分_取费_*
dest_folder = folders['xml_data1.5']
elif (filename.startswith('费用表') or
filename.startswith('费用集') or
filename.startswith('取费表') or
filename.startswith('线路特征') or
(filename.startswith('项目划分') and not filename.startswith('项目划分_取费_'))):
# xml_data: 费用表_*, 取费表_*, 线路特征_*, 项目划分_*(排除项目划分_取费_*)
dest_folder = folders['xml_data']
elif (filename.startswith('安装_工程量') or
'工程量精度' in filename or
'工程属性' in filename):
# xml_data2: 安装_工程量*, 工程量精度, 工程属性
dest_folder = folders['xml_data2']
else:
# xml_data3: 剩余文件
dest_folder = folders['xml_data3']
# 复制文件而不是移动
try:
shutil.copy2(str(file_path), str(dest_folder / filename))
# print(f"复制文件: {filename} -> {dest_folder.name}")
copied_files += 1
except Exception as e:
print(f"复制文件 {filename} 时出错: {e}")
print(f"\n文件复制分类完成!共处理 {copied_files} 个文件")
print("输出文件夹位置:")
for name, path in folders.items():
file_count = len([f for f in path.iterdir() if f.is_file()])
print(f" {name}: {path} ({file_count} 个文件)")
return True
def list_target_xmls(
dir_path: str,
keyword: str,
recursive: bool = False,
fullpath: bool = False):
import re
from pathlib import Path
from typing import List
"""
在 dir_path 下查找匹配 {keyword}*.xml 的文件。
- keyword: 动态关键字,例如 "安装_工程量"
- recursive: 是否递归子目录
- fullpath: 返回完整路径(True) 或仅文件名(False)
"""
# 构造正则,注意转义特殊字符
safe_kw = re.escape(keyword)
pattern = re.compile(rf'^{safe_kw}.*\.xml$', re.IGNORECASE)
base = Path(dir_path)
if not base.is_dir():
raise NotADirectoryError(f"目录不存在或不可用: {dir_path}")
it = base.rglob('*') if recursive else base.iterdir()
results = []
for p in it:
if p.is_file() and pattern.match(p.name):
results.append(str(p.resolve() if fullpath else p.name))
results.sort(key=lambda s: s.lower())
return results
def filter_files_by_display_names(xml_file_path, files_list, prefix="费用表_"):
import re
import os
import xml.etree.ElementTree as ET
"""
从“费用表分类信息.xml” => 筛选出需要的“费用表_*.xml”
Args:
xml_file_path: 包含显示名称的XML文件路径
files_list: 需要筛选的文件路径列表
Returns:
筛选后的文件路径列表
"""
# 从XML文件提取显示名称
try:
tree = ET.parse(xml_file_path)
root = tree.getroot()
display_names = []
# 查找所有包含"显示名称"属性的元素
for elem in root.iter():
display_name = elem.get('显示名称')
if display_name:
display_names.append(display_name)
except Exception as e:
print(f"解析XML文件时出错: {e}")
return []
# 构造动态正则表达式
safe_prefix = re.escape(prefix)
pattern = re.compile(rf'{safe_prefix}(.*?)\.xml$', re.IGNORECASE)
matched_files = []
matched_names = set() # 使用集合避免重复
for file_path in files_list:
match = pattern.search(os.path.basename(file_path))
if match and match.group(1) in display_names:
matched_files.append(file_path)
matched_names.add(match.group(1))
# 将匹配的显示名称转换为列表,保持原始顺序
matched_display_names = [name for name in display_names if name in matched_names]
return matched_files, matched_display_names
def extract_table_name_from_xml_file(file_path):
import xml.etree.ElementTree as ET
"""
从XML文件中提取
标签的名称属性
Args:
file_path (str): XML文件路径
Returns:
str: table的名称属性值,如果未找到则返回None
"""
try:
# 从文件解析XML
tree = ET.parse(file_path)
root = tree.getroot()
# 查找DataDefs -> dataset -> table 元素
table_element = root.find('.//DataDefs/dataset/table')
if table_element is not None:
# 获取名称属性
table_name = table_element.get('名称')
return table_name
else:
print("未找到table元素")
return None
except ET.ParseError as e:
print(f"XML解析错误: {e}")
return None
except FileNotFoundError:
print(f"文件未找到: {file_path}")
return None
except Exception as e:
print(f"处理XML时发生错误: {e}")
return None
###########################费用表#################################
# 根据分类信息,没有:变量费用、常驻费用、其他
# projectData => projectCost
def xml_to_json(xml_content):
import xml.etree.ElementTree as ET
"""
将XML内容转换为指定的JSON格式
"""
try:
# 解析XML
root = ET.fromstring(xml_content)
# 查找所有的table元素
tables = root.findall('.//table')
result = []
for table in tables:
# 查找table下的trs元素
trs = table.find('./trs')
if trs is not None:
# 处理trs下的所有直接tr子元素
for tr in trs.findall('./tr'):
tr_data = process_tr_element(tr)
if tr_data:
result.append(tr_data)
return result
except ET.ParseError as e:
print(f"XML解析错误: {e}")
return []
except Exception as e:
print(f"处理错误: {e}")
return []
def process_tr_element(tr_element):
"""
递归处理tr元素及其子元素
"""
if tr_element is None:
return None
# 提取当前tr元素的所有属性
node_data = {}
for attr_name, attr_value in tr_element.attrib.items():
node_data[attr_name] = attr_value
# 查找当前tr元素下的直接tr子元素
children = []
for child_tr in tr_element.findall('./tr'):
child_data = process_tr_element(child_tr)
if child_data:
children.append(child_data)
# 如果有子元素,添加到children字段
if children:
node_data['children'] = children
return node_data
def xml_file_to_json(file_path):
"""
从XML文件读取并转换为JSON
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
xml_content = file.read()
return xml_to_json(xml_content)
except Exception as e:
print(f"文件读取错误: {e}")
return []
# def save_json_to_file(json_data, output_file):
# """
# 将JSON数据保存到文件
# """
# try:
# with open(output_file, 'w', encoding='utf-8') as file:
# json.dump(json_data, file, ensure_ascii=False, indent=4)
# print(f"JSON数据已保存到: {output_file}")
# except Exception as e:
# print(f"保存文件错误: {e}")
# 保存结果
# save_json_to_file(result, "output.json")
###########################取费表#################################
# projectData => costSetting
def name_TypeList_info(xml_path):
import xml.etree.ElementTree as ET
try:
tree = ET.parse(xml_path) # 替换为你的XML文件路径
root = tree.getroot()
# 查找目标元素
target_tr = root.find(".//tr[@名称='名称']")
if target_tr is not None:
value = target_tr.get('值')
return value
else:
return "未找到指定的tr元素"
except FileNotFoundError:
return "XML文件未找到"
except ET.ParseError:
return "XML解析错误"
###########################项目划分#################################
# projectData => projectDivision
def pd_get_table_names_from_xml(file_path):
import xml.etree.ElementTree as ET
"""
从XML文件中提取所有table元素的名称属性
参数:
file_path (str): XML文件路径
返回:
list: 包含所有table名称的列表
"""
try:
tree = ET.parse(file_path)
tables = tree.findall('.//table')
return [table.get('名称') for table in tables if table.get('名称')]
except Exception as e:
print(f"错误: {e}")
return []
def build_project_division(names):
result = {}
for name in names:
key = f"项目划分_{name}"
result[key] = {name: None}
return result
def create_mapping(table_names, files_list):
"""
创建表名到文件路径的映射
"""
result = {}
for table_name in table_names:
# 构建期望的文件名模式
expected_filename = f"项目划分_{table_name}.xml"
# 在文件列表中查找匹配的文件路径
for file_path in files_list:
if file_path.endswith(expected_filename):
result[table_name] = file_path
break
return result
###########################费用预览(项目划分_取费)#################################
# projectData => expensePreview
def parse_costs_from_xml_file(xml_path):
import xml.etree.ElementTree as ET
# 解析 XML 文件
tree = ET.parse(xml_path)
root = tree.getroot()
# 获取 元素
table_elem = root.find('.//table')
if table_elem is None:
raise ValueError("未找到 元素")
guid = table_elem.get('name', '')
# 提取 中 id -> name 映射
id_to_name = {}
for tr in table_elem.find('props').findall('tr'):
tr_id = tr.get('id')
tr_name = tr.get('name')
if tr_id is not None and tr_name is not None:
id_to_name[tr_id] = tr_name
# 获取 /,并提取 cost 属性
tr_elem = table_elem.find('trs/tr')
if tr_elem is None:
raise ValueError("未找到 / 元素")
cost_str = tr_elem.get('cost')
if cost_str is None:
raise ValueError("在
元素中未找到 'cost' 属性")
# 构建 id -> cost 映射
cost_map = {}
for item in cost_str.split(','):
if ':' not in item:
continue
key, value = item.split(':', 1)
cost_map[key] = value if value else None
# 构建最终输出结构
result = {
"GUID": f"{{{guid}}}",
"children": []
}
for tr_id, tr_name in id_to_name.items():
result["children"].append({
"id": tr_name,
"cost": cost_map.get(tr_id)
})
return result
###########################工程量#################################
# projectData => projectDivision
# 更安全的版本,不依赖eval验证
def extract_file_paths_with_access_path_safe(data, target_key="_文件名称", root_var="data"):
"""
安全版本:不依赖eval验证路径
"""
results = []
def _recursive_extract(obj, current_path_parts, is_list_index=False):
if isinstance(obj, dict):
if target_key in obj:
# 构建完整的访问路径
full_path_parts = current_path_parts + [f"['{target_key}']"]
full_access_path = root_var + ''.join(full_path_parts)
results.append({
"file_path": obj[target_key],
"access_path": full_access_path,
"path_parts": full_path_parts # 保存路径部分用于验证
})
for key, value in obj.items():
if key == target_key:
continue
new_path_parts = current_path_parts + [f"['{key}']"]
_recursive_extract(value, new_path_parts, False)
elif isinstance(obj, list):
for i, item in enumerate(obj):
new_path_parts = current_path_parts + [f"[{i}]"]
_recursive_extract(item, new_path_parts, True)
_recursive_extract(data, [], False)
return results
def parse_xml_to_json(xml_content: str, from_file: bool = False):
import xml.etree.ElementTree as ET
from typing import Dict, List, Any
"""
将XML内容或XML文件转换为期望的JSON结构
Args:
xml_content: XML格式的字符串内容或XML文件路径
from_file: 如果为True,则xml_content被视为文件路径;否则视为XML字符串
Returns:
转换后的JSON结构列表
"""
# 解析XML
if from_file:
tree = ET.parse(xml_content)
root = tree.getroot()
else:
root = ET.fromstring(xml_content)
# 构建人材机数据字典,用于快速查找
material_dict = {}
material_table = root.find(".//table[@名称='人材机']")
if material_table is not None:
for tr in material_table.findall(".//tr"):
material_id = tr.get("id")
if material_id:
# 复制所有属性,并将_s改为s
material_data = {}
for attr, value in tr.attrib.items():
if attr == "_s":
material_data["s"] = value
else:
material_data[attr] = value
material_dict[material_id] = material_data
# 处理安装表数据
result = []
install_table = root.find(".//table[@名称='安装']")
if install_table is not None:
for tr in install_table.findall(".//tr"):
# 处理主tr的属性
main_item = {}
for attr, value in tr.attrib.items():
main_item[attr] = value
# 处理children
children = []
# 遍历所有的props
for props in tr.findall(".//props"):
props_type = props.get("type")
if not props_type:
continue
props_item = {
"type": props_type,
"children": []
}
# 处理props下的tr
for child_tr in props.findall(".//tr"):
if props_type == "40000016":
# 对于type=40000016,使用s属性查找人材机数据
s_value = child_tr.get("s")
if s_value and s_value in material_dict:
props_item["children"].append(material_dict[s_value])
else:
# 对于其他type,直接使用当前tr的属性
child_data = {}
for attr, value in child_tr.attrib.items():
child_data[attr] = value
props_item["children"].append(child_data)
children.append(props_item)
main_item["children"] = children
result.append(main_item)
return result
def file_exists_simple(file_path):
import os
"""判断文件是否存在(简化版)"""
return 1 if os.path.isfile(file_path) else 0
def get_by_strpath(data, path: str, replace_with=None):
import re
import ast
import copy
"""
从 data 中按路径字符串(如 "data['a'][0]['b']")取值。
安全:不执行任意代码;仅解析 [] 访问。
可选:
- replace_with: 若不为 None,则在成功读取后把该位置的值替换为 replace_with 的深拷贝。
约束:替换前内容必须是 str,replace_with 必须是 list。
返回值:写入到数据结构中的“新 list”(深拷贝对象)。
无替换时:返回读取到的原值。
"""
if not isinstance(path, str):
raise TypeError("path 必须是 str")
s = path.strip()
if s.startswith("data"):
s = s[4:] # 去掉开头的 data
cur = data
pos = 0
parent = None
last_selector = None # ("idx", i) 或 ("key", k)
# 匹配 ["key"] / ['key'] / [123],允许空格
_TOKEN_RE = re.compile(r"""
\[\s*
(?:
(?P'(?:\\.|[^\\'])*'|"(?:\\.|[^\\"])*") # 带引号的字符串键
| (?P-?\d+) # 数字索引(支持负数)
)
\s*\]
""", re.VERBOSE)
for m in _TOKEN_RE.finditer(s):
# 确保 token 之间只有空白,没有其他字符
if s[pos:m.start()].strip():
junk = s[pos:m.start()].strip()
raise ValueError(f"无法解析路径,存在非法片段: {junk!r}")
pos = m.end()
parent = cur
if m.group("idx") is not None: # list 索引
idx = int(m.group("idx"))
try:
cur = cur[idx]
except Exception as e:
raise IndexError(f"索引 {idx} 访问失败;当前类型为 {type(parent).__name__}") from e
last_selector = ("idx", idx)
else: # dict 键
qstr = m.group("q")
key = ast.literal_eval(qstr) # 安全还原字符串
try:
cur = cur[key]
except Exception as e:
raise KeyError(f"键 {key!r} 访问失败;当前类型为 {type(parent).__name__}") from e
last_selector = ("key", key)
# 末尾不应有多余内容
if s[pos:].strip():
raise ValueError("路径末尾存在无法解析的内容")
# 替换逻辑:写入新 list(深拷贝),并返回这个新 list
if replace_with is not None:
if last_selector is None:
raise ValueError("路径为空,无法替换根节点;请提供至少一个 [] 访问片段")
if not isinstance(cur, str):
raise TypeError(f"替换前的值必须是 str,当前为 {type(cur).__name__}")
if not isinstance(replace_with, list):
raise TypeError("replace_with 必须是 list")
new_list = copy.deepcopy(replace_with) # 保证返回/写入的是“新 list”
kind, sel = last_selector
if kind == "idx":
if not isinstance(parent, list):
raise TypeError(f"父节点应为 list,但为 {type(parent).__name__}")
parent[sel] = new_list
else:
if not isinstance(parent, dict):
raise TypeError(f"父节点应为 dict,但为 {type(parent).__name__}")
parent[sel] = new_list
return new_list
# 不替换则返回读取值
return cur
###########################线路特征段#################################
# projectData => 线路特征段
def extract_title_from_path(path: str) -> str:
import os
import re
"""
从文件路径中提取目标段(下划线后、{数字} 和 .xml 之前的部分)。
例如: '.../线路特征_土质比例{1272}.xml' -> '土质比例'
"""
name = os.path.basename(path)
stem, ext = os.path.splitext(name)
m = re.compile(r"^[^_]*_(.+?)(?:\{\d+\})?$").match(stem)
if not m:
raise ValueError(f"无法从文件名提取:{name}")
return m.group(1).strip()
# 修复转换后xml的格式问题
def process_xml_file(input_file_path, output_file_path=None):
import re
import xml.etree.ElementTree as ET
import os
"""
处理XML文件中的不规范内容,对XML解析错误有更好的容错处理
参数:
input_file_path: 输入XML文件路径
output_file_path: 输出XML文件路径,如果为None则自动生成
返回:
output_file_path: 输出文件路径
"""
def process_text_content(text):
"""处理文本内容"""
if not text:
return text
# 1. 将所有":"转为"_"
processed = text.replace(":", "_")
# 2. 处理类似(≥/≤/>/<******)格式
pattern1 = r'((≥|≤|>|<)(.*?))'
def replace_special_symbols(match):
symbol = match.group(1)
content = match.group(2)
symbol_mapping = {
'≥': '大于等于',
'≤': '小于等于',
'>': '大于',
'<': '小于'
}
return f'_{symbol_mapping.get(symbol, symbol)}{content}'
processed = re.sub(pattern1, replace_special_symbols, processed)
# 3. 处理类似A(*)转为A_*
pattern2 = r'([^()\s]+)(([^)\s]+))'
processed = re.sub(pattern2, r'\1_\2', processed)
return processed
def process_xml_element(element):
"""递归处理XML元素"""
# 处理元素的文本内容
if element.text:
element.text = process_text_content(element.text)
# 处理元素的属性
for attr_name, attr_value in element.attrib.items():
element.set(attr_name, process_text_content(attr_value))
# 处理子元素
for child in element:
process_xml_element(child)
# 处理子元素的尾部文本
if child.tail:
child.tail = process_text_content(child.tail)
try:
print("----------清洗文件----------")
# 设置输出文件路径
if output_file_path is None:
file_dir = os.path.dirname(input_file_path)
file_name = os.path.basename(input_file_path)
name_without_ext = os.path.splitext(file_name)[0]
output_file_path = os.path.join(file_dir, f"{name_without_ext}_processed.xml")
# 尝试直接解析XML文件
try:
tree = ET.parse(input_file_path)
root = tree.getroot()
# 处理整个XML树
process_xml_element(root)
# 保存处理后的XML
tree.write(output_file_path, encoding='utf-8', xml_declaration=True)
except ET.ParseError as e:
print(f"XML解析错误: {e}")
print("尝试使用替代方法处理文件...")
# 替代方法:逐行读取并处理文件内容
with open(input_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 处理整个文件内容
processed_content = process_text_content(content)
# 保存处理后的内容
with open(output_file_path, 'w', encoding='utf-8') as f:
f.write(processed_content)
print("使用文本模式处理完成,但可能无法完全保持XML结构")
print(f"XML文件处理完成!")
print(f"输入文件: {input_file_path}")
print(f"输出文件: {output_file_path}")
print("----------清洗结束----------")
return output_file_path
except Exception as e:
raise Exception(f"处理文件时发生错误: {e}")
###########################工程属性#################################
# projectData => projectInfo
def xml_to_dict(xml_path: str):
import xml.etree.ElementTree as ET
"""
将指定路径的XML文件解析为字典
Args:
xml_path: XML文件的路径
Returns:
包含XML中数据的字典,格式为 {名称: 值}
"""
try:
# 解析XML文件
tree = ET.parse(xml_path)
root = tree.getroot()
result_dict = {}
# 查找所有的tr元素
for tr in root.findall('.//tr'):
# 获取名称属性
name = tr.get('名称')
# 获取值属性,如果没有值属性则返回None
value = tr.get('值')
if name is not None:
result_dict[name] = value
return result_dict
except ET.ParseError as e:
raise ValueError(f"XML解析错误: {e}")
except FileNotFoundError:
raise FileNotFoundError(f"文件未找到: {xml_path}")
except Exception as e:
raise RuntimeError(f"处理XML文件时发生错误: {e}")